提交 7b6b5335 编写于 作者: J jiacy-jcy

Merge branch '3.0' into 3.0test/jcy

......@@ -49,9 +49,9 @@ int32_t grantCheck(EGrantType grant);
#ifndef GRANTS_CFG
#define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
......@@ -59,8 +59,8 @@ int32_t grantCheck(EGrantType grant);
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
}
#define GRANT_CFG_ADD
......
......@@ -337,8 +337,10 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
}
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
if (pSchemaWrapper) {
taosMemoryFree(pSchemaWrapper->pSchema);
taosMemoryFree(pSchemaWrapper);
}
}
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
......
......@@ -253,7 +253,8 @@ typedef struct SShowCreateTableStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
void* pCfg; // STableCfg
void* pDbCfg; // SDbCfgInfo
void* pTableCfg; // STableCfg
} SShowCreateTableStmt;
typedef struct SShowTableDistributedStmt {
......@@ -282,6 +283,7 @@ typedef struct SCreateIndexStmt {
ENodeType type;
EIndexType indexType;
bool ignoreExists;
char indexDbName[TSDB_DB_NAME_LEN];
char indexName[TSDB_INDEX_NAME_LEN];
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
......@@ -292,6 +294,7 @@ typedef struct SCreateIndexStmt {
typedef struct SDropIndexStmt {
ENodeType type;
bool ignoreNotExists;
char indexDbName[TSDB_DB_NAME_LEN];
char indexName[TSDB_INDEX_NAME_LEN];
} SDropIndexStmt;
......
......@@ -552,6 +552,8 @@ typedef struct SQueryPlan {
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext);
const char* dataOrderStr(EDataOrderLevel order);
#ifdef __cplusplus
}
#endif
......
......@@ -608,6 +608,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
文件模式从 100644 更改为 100755
......@@ -1011,6 +1011,11 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if ((terrno = grantCheck(TSDB_GRANT_STABLE)) < 0) {
code = -1;
goto _OVER;
}
if (isAlter) {
bool needRsp = false;
SStbObj pDst = {0};
......
......@@ -151,6 +151,11 @@ enum {
RSMA_ROLE_ITERATE = 4,
};
enum {
RSMA_RESTORE_REBOOT = 1,
RSMA_RESTORE_SYNC = 2,
};
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
......@@ -227,7 +232,8 @@ void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
......
......@@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) {
}
// restore the rsma
if (rsmaRestore(pSma) < 0) {
if (tdRsmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) {
goto _err;
}
}
......@@ -148,12 +148,14 @@ int32_t smaClose(SSma *pSma) {
/**
* @brief rsma env restore
*
* @param pSma
* @return int32_t
*
* @param pSma
* @param type
* @param committedVer
* @return int32_t
*/
static int32_t rsmaRestore(SSma *pSma) {
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
ASSERT(VND_IS_RSMA(pSma->pVnode));
return tdProcessRSmaRestoreImpl(pSma);
return tdProcessRSmaRestoreImpl(pSma, type, committedVer);
}
\ No newline at end of file
......@@ -41,12 +41,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
// adapt accordingly if definition of SRSmaInfo update
......@@ -80,7 +80,7 @@ void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName)
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char* path, char *outputName) {
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) {
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}
......@@ -319,9 +319,13 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
ASSERT(0); // TODO: free original pRSmaInfo if exists abnormally
smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS;
// TODO: free original pRSmaInfo if exists abnormally
tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true);
if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) {
terrno = TSDB_CODE_RSMA_REMOVE_EXISTS;
goto _err;
}
smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
}
// from write queue: single thead
......@@ -648,7 +652,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
pItem->taskInfo, suid);
if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED;
}
......@@ -872,24 +876,23 @@ _err:
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) {
SVnode *pVnode = pSma->pVnode;
STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName);
tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
goto _err;
}
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
*committed = 0;
if (pVnode->state.committed > 0) {
smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
if (qTaskFileVer > 0) {
smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist",
TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
} else {
smaDebug("vgId:%d, rsma restore for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile));
smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
}
return TSDB_CODE_SUCCESS;
}
......@@ -914,7 +917,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err;
}
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter);
tdCloseTFile(&tFile);
tdDestroyTFile(&tFile);
......@@ -925,13 +928,13 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
tdCloseTFile(&tFile);
tdDestroyTFile(&tFile);
// restored successfully from committed
*committed = pVnode->state.committed;
// restored successfully from committed or sync
smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode),
type, qTaskFileVer);
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma restore for version %" PRIi64 ", qtaskinfo reload failed since %s", TD_VID(pVnode),
pVnode->state.committed, terrstr());
smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s",
TD_VID(pVnode), type, qTaskFileVer, terrstr());
return TSDB_CODE_FAILED;
}
......@@ -939,15 +942,14 @@ _err:
* @brief reload ts data from checkpoint
*
* @param pSma
* @param committed restore from committed version
* @return int32_t
*/
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed) {
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
// NOTHING TODO: the data would be restored from the unified WAL replay procedure
return TSDB_CODE_SUCCESS;
}
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
// step 1: iterate all stables to restore the rsma env
int64_t nTables = 0;
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
......@@ -955,24 +957,24 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
}
if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task since no tables", SMA_VID(pSma));
smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type);
return TSDB_CODE_SUCCESS;
}
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
int64_t committed = -1;
if (tdRSmaRestoreQTaskInfoReload(pSma, &committed) < 0) {
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
goto _err;
}
// step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma, committed) < 0) {
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err;
}
smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, failed to restore rsma task since %s", SMA_VID(pSma), terrstr());
smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
qtaskFileVer, terrstr());
return TSDB_CODE_FAILED;
}
......@@ -1101,7 +1103,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) {
while (1) {
// block iter
bool isFinish = false;
......@@ -1117,7 +1119,7 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
smaError("vgId:%d, restore rsma qtaskinfo file %s failed since %s", SMA_VID(pSma),
smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type,
TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
return TSDB_CODE_FAILED;
}
......@@ -1130,8 +1132,8 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) {
infoItem.qTaskInfo = pIter->qBuf;
infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
// do the restore job
smaDebug("vgId:%d, restore the qtask info %s offset:%" PRIi64 "\n", SMA_VID(pSma),
TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma),
type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);
......
......@@ -275,7 +275,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
qWriter->pSma = pSma;
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 1, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!qTaskF) {
code = TAOS_SYSTEM_ERROR(errno);
......@@ -323,10 +323,19 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
// qtaskinfo
if (pWriter->pQTaskFWriter) {
char qTaskInfoFullName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pWriter->ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
if (taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma),
pWriter->pQTaskFWriter->fname, qTaskInfoFullName);
// rsma restore
if ((code = tdRsmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) {
goto _err;
}
smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName);
}
}
......
......@@ -490,11 +490,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_STABLE)) < 0) {
rcode = -1;
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
rcode = -1;
goto _exit;
......@@ -850,13 +845,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_STABLE)) < 0) {
pRsp->code = terrno;
tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit;
}
if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
pRsp->code = terrno;
tDecoderClear(&decoder);
......
......@@ -19,9 +19,8 @@
#define BATCH_DISABLE 1
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) ||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) ||
(type == TDMT_VND_ALTER_REPLICA);
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA);
}
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
......@@ -611,10 +610,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
do {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
if (itemSize == 0) {
vDebug("vgId:%d, apply queue is empty, start write snapshot", pVnode->config.vgId);
vInfo("vgId:%d, start write vnode snapshot since apply queue is empty", pVnode->config.vgId);
break;
} else {
vDebug("vgId:%d, %d items in apply queue, write snapshot later", pVnode->config.vgId);
vInfo("vgId:%d, write vnode snapshot later since %d items in apply queue", pVnode->config.vgId);
taosMsleep(10);
}
} while (true);
......@@ -630,10 +629,11 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, stop write snapshot, isApply:%d", pVnode->config.vgId, isApply);
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vDebug("vgId:%d, apply snapshot to vnode, code:0x%x", pVnode->config.vgId, code);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
return code;
#else
taosMemoryFree(pWriter);
......@@ -644,8 +644,9 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
vTrace("vgId:%d, write snapshot, len:%d", pVnode->config.vgId, len);
vDebug("vgId:%d, continue write vnode snapshot finished, len:%d", pVnode->config.vgId, len);
return code;
#else
return 0;
......
......@@ -192,6 +192,26 @@ char* buildRetension(SArray* pRetension) {
return p1;
}
static const char* cacheModelStr(int8_t cacheModel) {
switch (cacheModel) {
case TSDB_CACHE_MODEL_NONE:
return TSDB_CACHE_MODEL_NONE_STR;
case TSDB_CACHE_MODEL_LAST_ROW:
return TSDB_CACHE_MODEL_LAST_ROW_STR;
case TSDB_CACHE_MODEL_LAST_VALUE:
return TSDB_CACHE_MODEL_LAST_VALUE_STR;
case TSDB_CACHE_MODEL_BOTH:
return TSDB_CACHE_MODEL_BOTH_STR;
default:
break;
}
return TSDB_CACHE_MODEL_NONE_STR;
}
static const char* strictStr(int8_t strict) {
return TSDB_DB_STRICT_ON == strict ? TSDB_DB_STRICT_ON_STR : TSDB_DB_STRICT_OFF_STR;
}
static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, SDbCfgInfo* pCfg) {
blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1;
......@@ -222,14 +242,15 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
char* retentions = buildRetension(pCfg->pRetensions);
len += sprintf(buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHEMODEL %d COMP %d DURATION %dm "
"FSYNC %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"STRICT %d WAL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, pCfg->cacheLast, pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
pCfg->pageSize, prec, pCfg->replications, pCfg->strict, pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables);
len += sprintf(
buf2 + VARSTR_HEADER_SIZE,
"CREATE DATABASE `%s` BUFFER %d CACHEMODEL '%s' COMP %d DURATION %dm "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
dbFName, pCfg->buffer, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel, pCfg->numOfVgroups,
1 == pCfg->numOfStables);
if (retentions) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions);
......@@ -383,21 +404,21 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
return TSDB_CODE_SUCCESS;
}
void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg* pCfg) {
if (pCfg->commentLen > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " COMMENT '%s'", pCfg->pComment);
} else if (0 == pCfg->commentLen) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " COMMENT ''");
}
if (pCfg->watermark1 > 0) {
if (NULL != pDbCfg->pRetensions && pCfg->watermark1 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " WATERMARK %" PRId64 "a", pCfg->watermark1);
if (pCfg->watermark2 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", %" PRId64 "a", pCfg->watermark2);
}
}
if (pCfg->delay1 > 0) {
if (NULL != pDbCfg->pRetensions && pCfg->delay1 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " MAX_DELAY %" PRId64 "a", pCfg->delay1);
if (pCfg->delay2 > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ", %" PRId64 "a", pCfg->delay2);
......@@ -405,7 +426,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
}
int32_t funcNum = taosArrayGetSize(pCfg->pFuncs);
if (funcNum > 0) {
if (NULL != pDbCfg->pRetensions && funcNum > 0) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " ROLLUP(");
for (int32_t i = 0; i < funcNum; ++i) {
char* pFunc = taosArrayGet(pCfg->pFuncs, i);
......@@ -419,7 +440,7 @@ void appendTableOptions(char* buf, int32_t* len, STableCfg* pCfg) {
}
}
static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName, STableCfg* pCfg) {
static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* pDbCfg, char* tbName, STableCfg* pCfg) {
int32_t code = 0;
blockDataEnsureCapacity(pBlock, 1);
pBlock->info.rows = 1;
......@@ -439,7 +460,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName,
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS (");
appendTagFields(buf2, &len, pCfg);
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
appendTableOptions(buf2, &len, pCfg);
appendTableOptions(buf2, &len, pDbCfg, pCfg);
} else if (TSDB_CHILD_TABLE == pCfg->tableType) {
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE TABLE `%s` USING `%s` (", tbName, pCfg->stbName);
appendTagNameFields(buf2, &len, pCfg);
......@@ -449,7 +470,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName,
return code;
}
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
appendTableOptions(buf2, &len, pCfg);
appendTableOptions(buf2, &len, pDbCfg, pCfg);
} else {
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE TABLE `%s` (", tbName);
appendColumnFields(buf2, &len, pCfg);
......@@ -465,7 +486,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, char* tbName,
static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
SSDataBlock* pBlock = buildCreateTbResultDataBlock();
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->tableName, pStmt->pCfg);
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg);
if (code) {
return code;
}
......@@ -473,7 +494,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
}
static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt, SRetrieveTableRsp** pRsp) {
STableCfg* pCfg = (STableCfg*)pStmt->pCfg;
STableCfg* pCfg = (STableCfg*)pStmt->pTableCfg;
if (TSDB_SUPER_TABLE != pCfg->tableType) {
terrno = TSDB_CODE_TSC_NOT_STABLE_ERROR;
return terrno;
......
......@@ -844,6 +844,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows);
......
......@@ -3328,17 +3328,19 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
return fillResult;
}
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
if (pExpr) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
}
}
}
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
}
}
}
......@@ -3768,11 +3770,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pSchemaInfo->dbname);
if (pSchemaInfo->sw == NULL) {
return;
}
taosMemoryFree(pSchemaInfo->tablename);
taosMemoryFreeClear(pSchemaInfo->tablename);
tDeleteSSchemaWrapper(pSchemaInfo->sw);
tDeleteSSchemaWrapper(pSchemaInfo->qsw);
}
......
......@@ -976,8 +976,12 @@ int32_t cleanUpUdfs() {
}
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
if (gUdfdProxy.udfStubs == NULL || taosArrayGetSize(gUdfdProxy.udfStubs) == 0) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
int32_t i = 0;
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
......
......@@ -713,7 +713,8 @@ void nodesDestroyNode(SNode* pNode) {
break;
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
destroyTableCfg((STableCfg*)(((SShowCreateTableStmt*)pNode)->pCfg));
taosMemoryFreeClear(((SShowCreateTableStmt*)pNode)->pDbCfg);
destroyTableCfg((STableCfg*)(((SShowCreateTableStmt*)pNode)->pTableCfg));
break;
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: // no pointer field
case QUERY_NODE_KILL_CONNECTION_STMT: // no pointer field
......@@ -1817,3 +1818,19 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) {
return TSDB_CODE_SUCCESS;
}
const char* dataOrderStr(EDataOrderLevel order) {
switch (order) {
case DATA_ORDER_LEVEL_NONE:
return "no order required";
case DATA_ORDER_LEVEL_IN_BLOCK:
return "in-datablock order";
case DATA_ORDER_LEVEL_IN_GROUP:
return "in-group order";
case DATA_ORDER_LEVEL_GLOBAL:
return "global order";
default:
break;
}
return "unknown";
}
......@@ -176,11 +176,11 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,
SNode* pRealTable, SNodeList* pCols, SNode* pOptions);
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pStreamOptions);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pIndexName);
SNode* createCreateComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
......
......@@ -424,8 +424,8 @@ from_db_opt(A) ::= FROM db_name(B).
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX not_exists_opt(D)
index_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, B, NULL, C); }
cmd ::= DROP INDEX exists_opt(B) index_name(A). { pCxt->pRootNode = createDropIndexStmt(pCxt, B, &A); }
full_table_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, A, B, NULL, C); }
cmd ::= DROP INDEX exists_opt(B) full_table_name(A). { pCxt->pRootNode = createDropIndexStmt(pCxt, B, A); }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
NK_LP duration_literal(C) NK_RP sliding_opt(D) sma_stream_opt(E). { A = createIndexOption(pCxt, B, releaseRawExprNode(pCxt, C), NULL, D, E); }
......@@ -608,10 +608,6 @@ column_alias(A) ::= NK_ID(B).
%destructor user_name { }
user_name(A) ::= NK_ID(B). { A = B; }
%type index_name { SToken }
%destructor index_name { }
index_name(A) ::= NK_ID(B). { A = B; }
%type topic_name { SToken }
%destructor topic_name { }
topic_name(A) ::= NK_ID(B). { A = B; }
......
......@@ -1402,19 +1402,18 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
return (SNode*)pStmt;
}
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,
SNode* pRealTable, SNodeList* pCols, SNode* pOptions) {
CHECK_PARSER_STATUS(pCxt);
if (!checkIndexName(pCxt, pIndexName)) {
return NULL;
}
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)nodesMakeNode(QUERY_NODE_CREATE_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->indexType = type;
pStmt->ignoreExists = ignoreExists;
COPY_STRING_FORM_ID_TOKEN(pStmt->indexName, pIndexName);
strcpy(pStmt->indexDbName, ((SRealTableNode*)pIndexName)->table.dbName);
strcpy(pStmt->indexName, ((SRealTableNode*)pIndexName)->table.tableName);
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pIndexName);
nodesDestroyNode(pRealTable);
pStmt->pCols = pCols;
pStmt->pOptions = (SIndexOptions*)pOptions;
......@@ -1434,15 +1433,14 @@ SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInt
return (SNode*)pOptions;
}
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName) {
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pIndexName) {
CHECK_PARSER_STATUS(pCxt);
if (!checkDbName(pCxt, NULL, true) || !checkIndexName(pCxt, pIndexName)) {
return NULL;
}
SDropIndexStmt* pStmt = (SDropIndexStmt*)nodesMakeNode(QUERY_NODE_DROP_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->ignoreNotExists = ignoreNotExists;
COPY_STRING_FORM_ID_TOKEN(pStmt->indexName, pIndexName);
strcpy(pStmt->indexDbName, ((SRealTableNode*)pIndexName)->table.dbName);
strcpy(pStmt->indexName, ((SRealTableNode*)pIndexName)->table.tableName);
nodesDestroyNode(pIndexName);
return (SNode*)pStmt;
}
......
......@@ -269,16 +269,15 @@ static int32_t collectMetaKeyFromUseDatabase(SCollectMetaKeyCxt* pCxt, SUseDatab
static int32_t collectMetaKeyFromCreateIndex(SCollectMetaKeyCxt* pCxt, SCreateIndexStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
if (INDEX_TYPE_SMA == pStmt->indexType) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code =
reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pCxt->pMetaCache);
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pCxt->pMetaCache);
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
}
return code;
......@@ -366,8 +365,8 @@ static int32_t collectMetaKeyFromShowStreams(SCollectMetaKeyCxt* pCxt, SShowStmt
}
static int32_t collectMetaKeyFromShowTables(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
TSDB_INS_TABLE_TABLES, pCxt->pMetaCache);
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLES,
pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStmt->pDbName) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, pCxt->pMetaCache);
......@@ -457,6 +456,9 @@ static int32_t collectMetaKeyFromShowCreateTable(SCollectMetaKeyCxt* pCxt, SShow
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
return code;
}
......
......@@ -1751,8 +1751,7 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
static bool sysTableFromVnode(const char* pTable) {
return (0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_TABLE_DISTRIBUTED) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
(0 == strcmp(pTable, TSDB_INS_TABLE_TABLE_DISTRIBUTED) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
......@@ -3705,6 +3704,11 @@ static int32_t checkTableWatermarkOption(STranslateContext* pCxt, STableOptions*
}
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) {
if (NULL != strchr(pStmt->tableName, '.')) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
"The table name cannot contain '.'");
}
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && !createStable && NULL != dbCfg.pRetensions) {
......@@ -4282,9 +4286,10 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
return buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
}
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, char* pTableName, int32_t* pVgId) {
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
int32_t* pVgId) {
SVgroupInfo vg = {0};
int32_t code = getTableHashVgroup(pCxt, pCxt->pParseCxt->db, pTableName, &vg);
int32_t code = getTableHashVgroup(pCxt, pDbName, pTableName, &vg);
if (TSDB_CODE_SUCCESS == code) {
*pVgId = vg.vgId;
}
......@@ -4301,7 +4306,7 @@ static int32_t getSmaIndexSql(STranslateContext* pCxt, char** pSql, int32_t* pLe
}
static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SSampleAstInfo* pInfo) {
pInfo->pDbName = pCxt->pParseCxt->db;
pInfo->pDbName = pStmt->dbName;
pInfo->pTableName = pStmt->tableName;
pInfo->pFuncs = nodesCloneList(pStmt->pOptions->pFuncs);
pInfo->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
......@@ -4328,7 +4333,7 @@ static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateSmaReq* pReq) {
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->indexName, &name), pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->indexDbName, pStmt->indexName, &name), pReq->name);
memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name), pReq->stb);
pReq->igExists = pStmt->ignoreExists;
......@@ -4352,7 +4357,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
}
}
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->tableName, &pReq->dstVgId);
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId);
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
}
......@@ -4365,7 +4370,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pCxt->pParseCxt->db, &dbCfg);
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && NULL != dbCfg.pRetensions) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_SMA_INDEX,
"Tables configured with the 'ROLLUP' option do not support creating sma index");
......@@ -4883,10 +4888,17 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD
}
static int32_t translateShowCreateTable(STranslateContext* pCxt, SShowCreateTableStmt* pStmt) {
SName name;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
return getTableCfg(pCxt, &name, (STableCfg**)&pStmt->pCfg);
pStmt->pDbCfg = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
if (NULL == pStmt->pDbCfg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pDbCfg);
if (TSDB_CODE_SUCCESS == code) {
SName name;
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
code = getTableCfg(pCxt, &name, (STableCfg**)&pStmt->pTableCfg);
}
return code;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
......@@ -5917,6 +5929,10 @@ static int32_t checkCreateSubTable(STranslateContext* pCxt, SCreateSubTableClaus
if (0 != strcmp(pStmt->dbName, pStmt->useDbName)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR);
}
if (NULL != strchr(pStmt->tableName, '.')) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
"The table name cannot contain '.'");
}
return TSDB_CODE_SUCCESS;
}
static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) {
......
此差异已折叠。
......@@ -54,7 +54,8 @@ TEST_F(ParserShowToUseTest, showCreateSTable) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_SHOW_CREATE_STABLE_STMT);
ASSERT_EQ(pQuery->execMode, QUERY_EXEC_MODE_LOCAL);
ASSERT_TRUE(pQuery->haveResultSet);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pDbCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pTableCfg, nullptr);
});
run("SHOW CREATE STABLE st1");
......@@ -67,7 +68,8 @@ TEST_F(ParserShowToUseTest, showCreateTable) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_SHOW_CREATE_TABLE_STMT);
ASSERT_EQ(pQuery->execMode, QUERY_EXEC_MODE_LOCAL);
ASSERT_TRUE(pQuery->haveResultSet);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pDbCfg, nullptr);
ASSERT_NE(((SShowCreateTableStmt*)pQuery->pRoot)->pTableCfg, nullptr);
});
run("SHOW CREATE TABLE t1");
......
......@@ -23,13 +23,13 @@ extern "C" {
#include "planner.h"
#include "taoserror.h"
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
#define planInfo(param, ...) qInfo("PLAN: " param, __VA_ARGS__)
#define planDebug(param, ...) qDebug("PLAN: " param, __VA_ARGS__)
#define planDebugL(param, ...) qDebugL("PLAN: " param, __VA_ARGS__)
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
#define planFatal(param, ...) qFatal("PLAN: " param, ##__VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, ##__VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, ##__VA_ARGS__)
#define planInfo(param, ...) qInfo("PLAN: " param, ##__VA_ARGS__)
#define planDebug(param, ...) qDebug("PLAN: " param, ##__VA_ARGS__)
#define planDebugL(param, ...) qDebugL("PLAN: " param, ##__VA_ARGS__)
#define planTrace(param, ...) qTrace("PLAN: " param, ##__VA_ARGS__)
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList);
......
......@@ -159,6 +159,10 @@ static bool isKeepOrderAggFunc(SNodeList* pFuncs) {
static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) {
// The sort level of agg with group by output data can only be DATA_ORDER_LEVEL_NONE
if (requirement > DATA_ORDER_LEVEL_NONE && (NULL != pAgg->pGroupKeys || !isKeepOrderAggFunc(pAgg->pAggFuncs))) {
planError(
"The output of aggregate cannot meet the requirements(%s) of the upper operator. "
"Illegal statement, should be intercepted in parser",
dataOrderStr(requirement));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pAgg->node.resultDataOrder = requirement;
......@@ -231,6 +235,10 @@ static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel
static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) {
if (DATA_ORDER_LEVEL_GLOBAL == requirement) {
planError(
"The output of partition cannot meet the requirements(%s) of the upper operator. "
"Illegal statement, should be intercepted in parser",
dataOrderStr(requirement));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pPart->node.resultDataOrder = requirement;
......
......@@ -484,7 +484,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sTrace("vgId:%d, sync get snapshot last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId,
snapshotLastApplyIndex, lastIndex);
return lastIndex;
......
......@@ -41,18 +41,21 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
// syncNodePingAll(ths);
// syncNodePingPeers(ths);
sTrace("vgId:%d, sync timeout, type:ping count:%d", ths->vgId, ths->pingTimerCounter);
syncNodeTimerRoutine(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->electTimerCounter);
sInfo("vgId:%d, sync timeout, type:election count:%d", ths->vgId, ths->electTimerCounter);
syncNodeElect(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->heartbeatTimerCounter);
sInfo("vgId:%d, sync timeout, type:replicate count:%d", ths->vgId, ths->heartbeatTimerCounter);
syncNodeReplicate(ths);
}
} else {
......
......@@ -612,6 +612,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
......@@ -506,7 +506,7 @@ class TDTestCase:
#show create table
tdSql.query("show create table jsons1")
tdSql.checkData(0, 1, 'CREATE STABLE `jsons1` (`ts` TIMESTAMP, `dataint` INT, `databool` BOOL, `datastr` NCHAR(50), `datastrbin` VARCHAR(150)) TAGS (`jtag` JSON) WATERMARK 5000a, 5000a')
tdSql.checkData(0, 1, 'CREATE STABLE `jsons1` (`ts` TIMESTAMP, `dataint` INT, `databool` BOOL, `datastr` NCHAR(50), `datastrbin` VARCHAR(150)) TAGS (`jtag` JSON)')
#test aggregate function:count/avg/twa/irate/sum/stddev/leastsquares
tdSql.query("select count(*) from jsons1 where jtag is not null")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册