提交 be059c6a 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' into feature/TD-14761

...@@ -14,25 +14,6 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR}) ...@@ -14,25 +14,6 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH}) MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH}) MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS "${TD_SOURCE_DIR}/.git")
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND cd ${TD_SOURCE_DIR} && ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
message(WARNING "git submodule update --init --recursive failed with ${GIT_SUBMOD_RESULT}, please checkout submodules")
endif()
endif()
endif()
if(NOT EXISTS "${TD_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.")
endif()
if (NOT DEFINED TD_GRANT) if (NOT DEFINED TD_GRANT)
SET(TD_GRANT FALSE) SET(TD_GRANT FALSE)
endif() endif()
......
...@@ -1764,17 +1764,15 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp); ...@@ -1764,17 +1764,15 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
typedef struct { typedef struct {
const char* tbName; const char* tbName;
int8_t action; int8_t action;
const char* colName;
// TSDB_ALTER_TABLE_ADD_COLUMN // TSDB_ALTER_TABLE_ADD_COLUMN
int8_t type; int8_t type;
int32_t bytes; int8_t flags;
const char* colAddName; int32_t bytes;
// TSDB_ALTER_TABLE_DROP_COLUMN // TSDB_ALTER_TABLE_DROP_COLUMN
const char* colDropName;
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES // TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
const char* colModName; int32_t colModBytes;
int32_t colModBytes;
// TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME // TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME
const char* colOldName;
const char* colNewName; const char* colNewName;
// TSDB_ALTER_TABLE_UPDATE_TAG_VAL // TSDB_ALTER_TABLE_UPDATE_TAG_VAL
const char* tagName; const char* tagName;
......
...@@ -323,6 +323,9 @@ int32_t* taosGetErrno(); ...@@ -323,6 +323,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516) #define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516)
#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0517) #define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0517)
#define TSDB_CODE_VND_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0518) #define TSDB_CODE_VND_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0518)
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
...@@ -639,6 +642,7 @@ int32_t* taosGetErrno(); ...@@ -639,6 +642,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_INTERNAL_PK TAOS_DEF_ERROR_CODE(0, 0x2646) #define TSDB_CODE_PAR_INVALID_INTERNAL_PK TAOS_DEF_ERROR_CODE(0, 0x2646)
#define TSDB_CODE_PAR_INVALID_TIMELINE_FUNC TAOS_DEF_ERROR_CODE(0, 0x2647) #define TSDB_CODE_PAR_INVALID_TIMELINE_FUNC TAOS_DEF_ERROR_CODE(0, 0x2647)
#define TSDB_CODE_PAR_INVALID_PASSWD TAOS_DEF_ERROR_CODE(0, 0x2648) #define TSDB_CODE_PAR_INVALID_PASSWD TAOS_DEF_ERROR_CODE(0, 0x2648)
#define TSDB_CODE_PAR_INVALID_ALTER_TABLE TAOS_DEF_ERROR_CODE(0, 0x2649)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
...@@ -4171,19 +4171,20 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { ...@@ -4171,19 +4171,20 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
if (tEncodeI8(pEncoder, pReq->action) < 0) return -1; if (tEncodeI8(pEncoder, pReq->action) < 0) return -1;
switch (pReq->action) { switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->type) < 0) return -1; if (tEncodeI8(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pReq->bytes) < 0) return -1; if (tEncodeI32v(pEncoder, pReq->bytes) < 0) return -1;
if (tEncodeCStr(pEncoder, pReq->colAddName) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
if (tEncodeCStr(pEncoder, pReq->colDropName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
if (tEncodeCStr(pEncoder, pReq->colModName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
if (tEncodeI32v(pEncoder, pReq->colModBytes) < 0) return -1; if (tEncodeI32v(pEncoder, pReq->colModBytes) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
if (tEncodeCStr(pEncoder, pReq->colOldName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1;
if (tEncodeCStr(pEncoder, pReq->colNewName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->colNewName) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
...@@ -4218,19 +4219,20 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { ...@@ -4218,19 +4219,20 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
switch (pReq->action) { switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->type) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pReq->bytes) < 0) return -1; if (tDecodeI32v(pDecoder, &pReq->bytes) < 0) return -1;
if (tDecodeCStr(pDecoder, &pReq->colAddName) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
if (tDecodeCStr(pDecoder, &pReq->colDropName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
if (tDecodeCStr(pDecoder, &pReq->colModName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
if (tDecodeI32v(pDecoder, &pReq->colModBytes) < 0) return -1; if (tDecodeI32v(pDecoder, &pReq->colModBytes) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
if (tDecodeCStr(pDecoder, &pReq->colOldName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
if (tDecodeCStr(pDecoder, &pReq->colNewName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->colNewName) < 0) return -1;
break; break;
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
......
...@@ -192,7 +192,8 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -192,7 +192,8 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rpcFreeCont(originalRpcMsg.pCont); rpcFreeCont(originalRpcMsg.pCont);
// if leader, send response // if leader, send response
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { //if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
if (pMsg->rpcMsg.handle != NULL) {
rsp.ahandle = pMsg->rpcMsg.ahandle; rsp.ahandle = pMsg->rpcMsg.ahandle;
rsp.handle = pMsg->rpcMsg.handle; rsp.handle = pMsg->rpcMsg.handle;
rsp.refId = pMsg->rpcMsg.refId; rsp.refId = pMsg->rpcMsg.refId;
...@@ -213,7 +214,23 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -213,7 +214,23 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
// todo // todo
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
(void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
if (ret != 0) {
// if leader, send response
if (pMsg->rpcMsg.handle != NULL) {
SRpcMsg rsp;
rsp.pCont = NULL;
rsp.contLen = 0;
rsp.code = terrno;
dTrace("vmProcessSyncQueue error, code:%d", terrno);
rsp.ahandle = pMsg->rpcMsg.ahandle;
rsp.handle = pMsg->rpcMsg.handle;
rsp.refId = pMsg->rpcMsg.refId;
tmsgSendRsp(&rsp);
}
}
rpcFreeCont(pMsg->rpcMsg.pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
......
...@@ -189,6 +189,7 @@ struct SMetaEntry { ...@@ -189,6 +189,7 @@ struct SMetaEntry {
struct { struct {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
int32_t ncid; // next column id
SSchemaWrapper schema; SSchemaWrapper schema;
} ntbEntry; } ntbEntry;
struct { struct {
......
...@@ -83,6 +83,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p ...@@ -83,6 +83,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
...@@ -97,7 +98,7 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); ...@@ -97,7 +98,7 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
// tsdb // tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg *pKeepCfg); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb); int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb);
...@@ -182,7 +183,7 @@ typedef enum { ...@@ -182,7 +183,7 @@ typedef enum {
TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2 TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2
} ETsdbType; } ETsdbType;
struct STsdbKeepCfg{ struct STsdbKeepCfg {
int8_t precision; // precision always be used with below keep cfgs int8_t precision; // precision always be used with below keep cfgs
int32_t days; int32_t days;
int32_t keep0; int32_t keep0;
......
...@@ -34,6 +34,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { ...@@ -34,6 +34,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1;
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
...@@ -65,10 +66,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -65,10 +66,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -240,6 +240,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { ...@@ -240,6 +240,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ctime = pReq->ctime;
me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.schema = pReq->ntb.schema; me.ntbEntry.schema = pReq->ntb.schema;
me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1;
} }
if (metaHandleEntry(pMeta, &me) < 0) goto _err; if (metaHandleEntry(pMeta, &me) < 0) goto _err;
...@@ -374,6 +375,170 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -374,6 +375,170 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
return 0; return 0;
} }
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
void *pVal = NULL;
int nVal = 0;
const void *pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
SSchema *pColumn = NULL;
SMetaEntry entry = {0};
SSchemaWrapper *pSchema;
int c;
// search name index
ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
}
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
// search uid index
TDBC *pUidIdxc = NULL;
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
// search table.db
TDBC *pTbDbc = NULL;
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
// get table entry
SDecoder dc = {0};
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &entry);
if (entry.type != TSDB_NORMAL_TABLE) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
// search the column to add/drop/update
pSchema = &entry.ntbEntry.schema;
int32_t iCol = 0;
for (;;) {
pColumn = NULL;
if (iCol >= pSchema->nCols) break;
pColumn = &pSchema->pSchema[iCol];
if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break;
iCol++;
}
entry.version = version;
int tlen;
switch (pAlterTbReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
if (pColumn) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
pSchema->sver++;
pSchema->nCols++;
pSchema->pSchema =
taosMemoryRealloc(entry.ntbEntry.schema.pSchema, sizeof(SSchema) * entry.ntbEntry.schema.nCols);
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes;
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type;
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags;
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++;
strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName);
break;
case TSDB_ALTER_TABLE_DROP_COLUMN:
if (pColumn == NULL) {
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
goto _err;
}
if (pColumn->colId == 0) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
pSchema->sver++;
pSchema->nCols--;
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
if (tlen) {
memmove(pColumn, pColumn + 1, tlen);
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
if (pColumn == NULL) {
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
goto _err;
}
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes <= pAlterTbReq->bytes) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
pSchema->sver++;
pColumn->bytes = pAlterTbReq->bytes;
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
if (pColumn == NULL) {
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
goto _err;
}
pSchema->sver++;
strcpy(pColumn->name, pAlterTbReq->colNewName);
break;
}
entry.version = version;
tDecoderClear(&dc);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
return 0;
_err:
tDecoderClear(&dc);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
return -1;
}
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
// TODO
return 0;
}
static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
// TODO
ASSERT(0);
return 0;
}
int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
case TSDB_ALTER_TABLE_DROP_COLUMN:
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
return metaAlterTableColumn(pMeta, version, pReq);
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
return metaUpdateTableTagVal(pMeta, version, pReq);
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
return metaUpdateTableOptions(pMeta, version, pReq);
default:
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
return -1;
break;
}
}
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey; STbDbKey tbDbKey;
void *pKey = NULL; void *pKey = NULL;
......
...@@ -19,7 +19,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -19,7 +19,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
...@@ -82,7 +82,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -82,7 +82,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_ALTER_TABLE: case TDMT_VND_ALTER_TABLE:
if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
...@@ -198,6 +198,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { ...@@ -198,6 +198,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
} }
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) { if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
...@@ -219,67 +221,70 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -219,67 +221,70 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnPingCb(pSyncNode, pSyncMsg); ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else { } else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} else { } else {
vError("==vnodeProcessSyncReq== error syncEnv stop"); vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
return 0;
return ret;
} }
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
...@@ -455,9 +460,32 @@ _exit: ...@@ -455,9 +460,32 @@ _exit:
return 0; return 0;
} }
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
// TODO SVAlterTbReq vAlterTbReq = {0};
ASSERT(0); SDecoder dc = {0};
pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
pRsp->pCont = NULL;
pRsp->contLen = 0;
pRsp->code = TSDB_CODE_SUCCESS;
tDecoderInit(&dc, pReq, len);
// decode
if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
return -1;
}
// process
if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) {
pRsp->code = terrno;
tDecoderClear(&dc);
return -1;
}
tDecoderClear(&dc);
return 0; return 0;
} }
...@@ -668,7 +696,7 @@ _exit: ...@@ -668,7 +696,7 @@ _exit:
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
SVCreateTSmaReq req = {0}; SVCreateTSmaReq req = {0};
SDecoder coder; SDecoder coder;
pRsp->msgType = TDMT_VND_CREATE_SMA_RSP; pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
...@@ -682,7 +710,7 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -682,7 +710,7 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq
pRsp->code = terrno; pRsp->code = terrno;
goto _err; goto _err;
} }
if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) { if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno; pRsp->code = terrno;
goto _err; goto _err;
......
...@@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin ...@@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
......
...@@ -2055,6 +2055,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR ...@@ -2055,6 +2055,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pSrc->pData == NULL) {
continue;
}
int32_t numOfRows = 0; int32_t numOfRows = 0;
for (int32_t j = 0; j < totalRows; ++j) { for (int32_t j = 0; j < totalRows; ++j) {
if (rowRes[j] == 0) { if (rowRes[j] == 0) {
...@@ -4372,9 +4377,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p ...@@ -4372,9 +4377,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
goto _error; goto _error;
} }
pInfo->limit = *pLimit; pInfo->limit = *pLimit;
pInfo->slimit = *pSlimit; pInfo->slimit = *pSlimit;
pInfo->curOffset = pLimit->offset; pInfo->curOffset = pLimit->offset;
pInfo->curSOffset = pSlimit->offset; pInfo->curSOffset = pSlimit->offset;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
......
...@@ -291,20 +291,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) ...@@ -291,20 +291,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
// this is to handle the tbname // this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) { if (fmIsScanPseudoColumnFunc(functionId)) {
struct SScalarFuncExecFuncs fpSet = {0}; setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
SScalarParam srcParam = {
.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
} else { // these are tags } else { // these are tags
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
...@@ -316,6 +303,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) ...@@ -316,6 +303,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
metaReaderClear(&mr); metaReaderClear(&mr);
} }
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
SScalarParam srcParam = {
.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
......
...@@ -241,7 +241,7 @@ alter_table_clause(A) ::= ...@@ -241,7 +241,7 @@ alter_table_clause(A) ::=
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); } full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
alter_table_clause(A) ::= alter_table_clause(A) ::=
full_table_name(B) SET TAG column_name(C) NK_EQ literal(D). { A = createAlterTableSetTag(pCxt, B, &C, D); } full_table_name(B) SET TAG column_name(C) NK_EQ literal(D). { A = createAlterTableSetTag(pCxt, B, &C, releaseRawExprNode(pCxt, D)); }
%type multi_create_clause { SNodeList* } %type multi_create_clause { SNodeList* }
%destructor multi_create_clause { nodesDestroyList($$); } %destructor multi_create_clause { nodesDestroyList($$); }
......
...@@ -2949,8 +2949,8 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p ...@@ -2949,8 +2949,8 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p
} }
static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) { static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) {
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t contLen = 0; int32_t contLen = 0;
SVDropTSmaReq dropSmaReq = {0}; SVDropTSmaReq dropSmaReq = {0};
strcpy(dropSmaReq.indexName, pStmt->indexName); strcpy(dropSmaReq.indexName, pStmt->indexName);
...@@ -2958,7 +2958,7 @@ static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt ...@@ -2958,7 +2958,7 @@ static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt
if (NULL == pCxt->pCmdMsg) { if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVDropTSmaReq, &dropSmaReq, contLen, ret); tEncodeSize(tEncodeSVDropTSmaReq, &dropSmaReq, contLen, ret);
if (ret < 0) { if (ret < 0) {
...@@ -3800,7 +3800,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) { ...@@ -3800,7 +3800,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
taosArrayDestroy(pTbBatch->req.pArray); taosArrayDestroy(pTbBatch->req.pArray);
} }
static int32_t rewriteToVnodeModifOpStmt(SQuery* pQuery, SArray* pBufArray) { static int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray) {
SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (pNewStmt == NULL) { if (pNewStmt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -3855,7 +3855,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -3855,7 +3855,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray); code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifOpStmt(pQuery, pBufArray); code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
destroyCreateTbReqArray(pBufArray); destroyCreateTbReqArray(pBufArray);
} }
...@@ -4111,7 +4111,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) ...@@ -4111,7 +4111,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
return rewriteToVnodeModifOpStmt(pQuery, pBufArray); return rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
} }
typedef struct SVgroupDropTableBatch { typedef struct SVgroupDropTableBatch {
...@@ -4251,14 +4251,162 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -4251,14 +4251,162 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
return rewriteToVnodeModifOpStmt(pQuery, pBufArray); return rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
} }
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { static int32_t buildAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, SVAlterTbReq* pReq) {
// todo pReq->tbName = strdup(pStmt->tableName);
if (NULL == pReq->tbName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->action = pStmt->alterType;
switch (pStmt->alterType) {
case TSDB_ALTER_TABLE_ADD_TAG:
case TSDB_ALTER_TABLE_DROP_TAG:
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
pReq->tagName = strdup(pStmt->colName);
if (NULL == pReq->tagName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (DEAL_RES_ERROR == translateValue(pCxt, pStmt->pVal)) {
return pCxt->errCode;
}
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
pReq->nTagVal = pStmt->pVal->node.resType.bytes;
char* pVal = nodesGetValueFromNode(pStmt->pVal);
pReq->pTagVal = IS_VAR_DATA_TYPE(pStmt->pVal->node.resType.type) ? pVal + VARSTR_HEADER_SIZE : pVal;
break;
case TSDB_ALTER_TABLE_ADD_COLUMN:
case TSDB_ALTER_TABLE_DROP_COLUMN:
pReq->colName = strdup(pStmt->colName);
if (NULL == pReq->colName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->type = pStmt->dataType.type;
pReq->flags = COL_SMA_ON;
pReq->bytes = pStmt->dataType.bytes;
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
pReq->colName = strdup(pStmt->colName);
if (NULL == pReq->colName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colModBytes = calcTypeBytes(pStmt->dataType);
break;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
if (-1 != pStmt->pOptions->ttl) {
pReq->updateTTL = true;
pReq->newTTL = pStmt->pOptions->ttl;
}
if ('\0' != pStmt->pOptions->comment[0]) {
pReq->updateComment = true;
pReq->newComment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->newComment) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
pReq->colName = strdup(pStmt->colName);
pReq->colNewName = strdup(pStmt->newColName);
if (NULL == pReq->colName || NULL == pReq->colNewName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
break;
default:
break;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t serializeAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, SVAlterTbReq* pReq,
SArray* pArray) {
SVgroupInfo vg = {0};
int32_t code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &vg);
int tlen = 0;
if (TSDB_CODE_SUCCESS == code) {
tEncodeSize(tEncodeSVAlterTbReq, pReq, tlen, code);
}
if (TSDB_CODE_SUCCESS == code) {
tlen += sizeof(SMsgHead);
void* pMsg = taosMemoryMalloc(tlen);
if (NULL == pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pMsg)->vgId = htonl(vg.vgId);
((SMsgHead*)pMsg)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
SEncoder coder = {0};
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
tEncodeSVAlterTbReq(&coder, pReq);
tEncoderClear(&coder);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
taosMemoryFree(pMsg);
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgData->vg = vg;
pVgData->pData = pMsg;
pVgData->size = tlen;
pVgData->numOfTables = 1;
taosArrayPush(pArray, &pVgData);
}
return code;
}
static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* pStmt, SVAlterTbReq* pReq,
SArray** pArray) {
SArray* pTmpArray = taosArrayInit(1, sizeof(void*));
if (NULL == pTmpArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = serializeAlterTbReq(pCxt, pStmt, pReq, pTmpArray);
if (TSDB_CODE_SUCCESS == code) {
*pArray = pTmpArray;
} else {
taosArrayDestroy(pTmpArray);
}
return code;
}
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (TSDB_SUPER_TABLE == pTableMeta->tableType) {
return TSDB_CODE_SUCCESS;
} else if (TSDB_CHILD_TABLE != pTableMeta->tableType && TSDB_NORMAL_TABLE != pTableMeta->tableType) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
SVAlterTbReq req = {0};
code = buildAlterTbReq(pCxt, pStmt, &req);
SArray* pArray = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = buildModifyVnodeArray(pCxt, pStmt, &req, &pArray);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
}
return code;
}
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pQuery->pRoot)) { switch (nodeType(pQuery->pRoot)) {
...@@ -4296,9 +4444,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -4296,9 +4444,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
code = rewriteDropTable(pCxt, pQuery); code = rewriteDropTable(pCxt, pQuery);
break; break;
case QUERY_NODE_ALTER_TABLE_STMT: case QUERY_NODE_ALTER_TABLE_STMT:
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == ((SAlterTableStmt*)pQuery->pRoot)->alterType) { code = rewriteAlterTable(pCxt, pQuery);
code = rewriteAlterTable(pCxt, pQuery);
}
break; break;
default: default:
break; break;
......
...@@ -152,6 +152,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -152,6 +152,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Invalid timeline function"; return "Invalid timeline function";
case TSDB_CODE_PAR_INVALID_PASSWD: case TSDB_CODE_PAR_INVALID_PASSWD:
return "Invalid password"; return "Invalid password";
case TSDB_CODE_PAR_INVALID_ALTER_TABLE:
return "Invalid alter table statement";
case TSDB_CODE_OUT_OF_MEMORY: case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory"; return "Out of memory";
default: default:
......
...@@ -3500,7 +3500,7 @@ static YYACTIONTYPE yy_reduce( ...@@ -3500,7 +3500,7 @@ static YYACTIONTYPE yy_reduce(
yymsp[-4].minor.yy172 = yylhsminor.yy172; yymsp[-4].minor.yy172 = yylhsminor.yy172;
break; break;
case 121: /* alter_table_clause ::= full_table_name SET TAG column_name NK_EQ literal */ case 121: /* alter_table_clause ::= full_table_name SET TAG column_name NK_EQ literal */
{ yylhsminor.yy172 = createAlterTableSetTag(pCxt, yymsp[-5].minor.yy172, &yymsp[-2].minor.yy105, yymsp[0].minor.yy172); } { yylhsminor.yy172 = createAlterTableSetTag(pCxt, yymsp[-5].minor.yy172, &yymsp[-2].minor.yy105, releaseRawExprNode(pCxt, yymsp[0].minor.yy172)); }
yymsp[-5].minor.yy172 = yylhsminor.yy172; yymsp[-5].minor.yy172 = yylhsminor.yy172;
break; break;
case 123: /* multi_create_clause ::= multi_create_clause create_subtable_clause */ case 123: /* multi_create_clause ::= multi_create_clause create_subtable_clause */
......
...@@ -69,7 +69,7 @@ TEST_F(ParserInitialATest, alterDatabase) { ...@@ -69,7 +69,7 @@ TEST_F(ParserInitialATest, alterDatabase) {
* | COMMENT 'string_value' * | COMMENT 'string_value'
* } * }
*/ */
TEST_F(ParserInitialATest, alterTable) { TEST_F(ParserInitialATest, alterSTable) {
useDb("root", "test"); useDb("root", "test");
SMAlterStbReq expect = {0}; SMAlterStbReq expect = {0};
...@@ -119,7 +119,7 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -119,7 +119,7 @@ TEST_F(ParserInitialATest, alterTable) {
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_TABLE_STMT); ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_TABLE_STMT);
SMAlterStbReq req = {0}; SMAlterStbReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); ASSERT_EQ(tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.name), std::string(expect.name)); ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.alterType, expect.alterType); ASSERT_EQ(req.alterType, expect.alterType);
ASSERT_EQ(req.numOfFields, expect.numOfFields); ASSERT_EQ(req.numOfFields, expect.numOfFields);
...@@ -139,24 +139,24 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -139,24 +139,24 @@ TEST_F(ParserInitialATest, alterTable) {
} }
}); });
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, nullptr, 10); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, nullptr, 10);
run("ALTER TABLE t1 TTL 10"); run("ALTER TABLE st1 TTL 10");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, "test"); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, "test");
run("ALTER TABLE t1 COMMENT 'test'"); run("ALTER TABLE st1 COMMENT 'test'");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, 1, "cc1", TSDB_DATA_TYPE_BIGINT); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_COLUMN, 1, "cc1", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE t1 ADD COLUMN cc1 BIGINT"); run("ALTER TABLE st1 ADD COLUMN cc1 BIGINT");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_DROP_COLUMN, 1, "c1"); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_DROP_COLUMN, 1, "c1");
run("ALTER TABLE t1 DROP COLUMN c1"); run("ALTER TABLE st1 DROP COLUMN c1");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c1", TSDB_DATA_TYPE_VARCHAR, setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c1", TSDB_DATA_TYPE_VARCHAR,
20 + VARSTR_HEADER_SIZE); 20 + VARSTR_HEADER_SIZE);
run("ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(20)"); run("ALTER TABLE st1 MODIFY COLUMN c1 VARCHAR(20)");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1"); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1");
run("ALTER TABLE t1 RENAME COLUMN c1 cc1"); run("ALTER TABLE st1 RENAME COLUMN c1 cc1");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_TAG, 1, "tag11", TSDB_DATA_TYPE_BIGINT); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_TAG, 1, "tag11", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE st1 ADD TAG tag11 BIGINT"); run("ALTER TABLE st1 ADD TAG tag11 BIGINT");
...@@ -171,7 +171,127 @@ TEST_F(ParserInitialATest, alterTable) { ...@@ -171,7 +171,127 @@ TEST_F(ParserInitialATest, alterTable) {
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_NAME, 2, "tag1", 0, 0, "tag11"); setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_NAME, 2, "tag1", 0, 0, "tag11");
run("ALTER TABLE st1 RENAME TAG tag1 tag11"); run("ALTER TABLE st1 RENAME TAG tag1 tag11");
// run("ALTER TABLE st1s1 SET TAG tag1=10"); // todo
// ADD {FULLTEXT | SMA} INDEX index_name (col_name [, col_name] ...) [index_option]
}
TEST_F(ParserInitialATest, alterTable) {
useDb("root", "test");
SVAlterTbReq expect = {0};
auto setAlterColFunc = [&](const char* pTbname, int8_t alterType, const char* pColName, int8_t dataType = 0,
int32_t dataBytes = 0, const char* pNewColName = nullptr) {
memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname);
expect.action = alterType;
expect.colName = strdup(pColName);
switch (alterType) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
expect.type = dataType;
expect.flags = COL_SMA_ON;
expect.bytes = dataBytes > 0 ? dataBytes : (dataType > 0 ? tDataTypes[dataType].bytes : 0);
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
expect.colModBytes = dataBytes;
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
expect.colNewName = strdup(pNewColName);
break;
default:
break;
}
};
auto setAlterTagFunc = [&](const char* pTbname, const char* pTagName, const uint8_t* pNewVal, uint32_t bytes) {
memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_TAG_VAL;
expect.tagName = strdup(pTagName);
expect.isNull = (nullptr == pNewVal);
expect.nTagVal = bytes;
expect.pTagVal = pNewVal;
};
auto setAlterOptionsFunc = [&](const char* pTbname, int32_t ttl, const char* pComment = nullptr) {
memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_OPTIONS;
if (-1 != ttl) {
expect.updateTTL = true;
expect.newTTL = ttl;
}
if (nullptr != pComment) {
expect.updateComment = true;
expect.newComment = pComment;
}
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_VNODE_MODIF_STMT);
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
ASSERT_EQ(pStmt->sqlNodeType, QUERY_NODE_ALTER_TABLE_STMT);
ASSERT_NE(pStmt->pDataBlocks, nullptr);
ASSERT_EQ(taosArrayGetSize(pStmt->pDataBlocks), 1);
SVgDataBlocks* pVgData = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, 0);
void* pBuf = POINTER_SHIFT(pVgData->pData, sizeof(SMsgHead));
SVAlterTbReq req = {0};
SDecoder coder = {0};
tDecoderInit(&coder, (const uint8_t*)pBuf, pVgData->size);
ASSERT_EQ(tDecodeSVAlterTbReq(&coder, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.tbName), std::string(expect.tbName));
ASSERT_EQ(req.action, expect.action);
if (nullptr != expect.colName) {
ASSERT_EQ(std::string(req.colName), std::string(expect.colName));
}
ASSERT_EQ(req.type, expect.type);
ASSERT_EQ(req.flags, expect.flags);
ASSERT_EQ(req.bytes, expect.bytes);
ASSERT_EQ(req.colModBytes, expect.colModBytes);
if (nullptr != expect.colNewName) {
ASSERT_EQ(std::string(req.colNewName), std::string(expect.colNewName));
}
if (nullptr != expect.tagName) {
ASSERT_EQ(std::string(req.tagName), std::string(expect.tagName));
}
ASSERT_EQ(req.isNull, expect.isNull);
ASSERT_EQ(req.nTagVal, expect.nTagVal);
ASSERT_EQ(memcmp(req.pTagVal, expect.pTagVal, expect.nTagVal), 0);
ASSERT_EQ(req.updateTTL, expect.updateTTL);
ASSERT_EQ(req.newTTL, expect.newTTL);
ASSERT_EQ(req.updateComment, expect.updateComment);
if (nullptr != expect.newComment) {
ASSERT_EQ(std::string(req.newComment), std::string(expect.newComment));
}
tDecoderClear(&coder);
});
setAlterOptionsFunc("t1", 10, nullptr);
run("ALTER TABLE t1 TTL 10");
setAlterOptionsFunc("t1", -1, "test");
run("ALTER TABLE t1 COMMENT 'test'");
setAlterColFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, "cc1", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE t1 ADD COLUMN cc1 BIGINT");
setAlterColFunc("t1", TSDB_ALTER_TABLE_DROP_COLUMN, "c1");
run("ALTER TABLE t1 DROP COLUMN c1");
setAlterColFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, "c1", TSDB_DATA_TYPE_VARCHAR, 20 + VARSTR_HEADER_SIZE);
run("ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(20)");
setAlterColFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, "c1", 0, 0, "cc1");
run("ALTER TABLE t1 RENAME COLUMN c1 cc1");
int64_t val = 10;
setAlterTagFunc("st1s1", "tag1", (const uint8_t*)&val, sizeof(val));
run("ALTER TABLE st1s1 SET TAG tag1=10");
// todo // todo
// ADD {FULLTEXT | SMA} INDEX index_name (col_name [, col_name] ...) [index_option] // ADD {FULLTEXT | SMA} INDEX index_name (col_name [, col_name] ...) [index_option]
......
...@@ -985,6 +985,8 @@ static int32_t getMsgType(ENodeType sqlType) { ...@@ -985,6 +985,8 @@ static int32_t getMsgType(ENodeType sqlType) {
return TDMT_VND_CREATE_TABLE; return TDMT_VND_CREATE_TABLE;
case QUERY_NODE_DROP_TABLE_STMT: case QUERY_NODE_DROP_TABLE_STMT:
return TDMT_VND_DROP_TABLE; return TDMT_VND_DROP_TABLE;
case QUERY_NODE_ALTER_TABLE_STMT:
return TDMT_VND_ALTER_TABLE;
default: default:
break; break;
} }
......
...@@ -256,6 +256,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m ...@@ -256,6 +256,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_DROP_TABLE_RSP: case TDMT_VND_DROP_TABLE_RSP:
case TDMT_VND_ALTER_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP: case TDMT_VND_SUBMIT_RSP:
break; break;
default: default:
...@@ -1131,6 +1132,24 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1131,6 +1132,24 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
} }
case TDMT_VND_ALTER_TABLE_RSP: {
SVAlterTbRsp rsp = {0};
if (msg) {
SDecoder coder = {0};
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVAlterTbRsp(&coder, &rsp);
tDecoderClear(&coder);
SCH_ERR_JRET(code);
SCH_ERR_JRET(rsp.code);
}
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
break;
}
case TDMT_VND_SUBMIT_RSP: { case TDMT_VND_SUBMIT_RSP: {
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
...@@ -1391,6 +1410,10 @@ int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t co ...@@ -1391,6 +1410,10 @@ int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t co
return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code); return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
} }
int32_t schHandleAlterTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) { int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
} }
...@@ -1490,6 +1513,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { ...@@ -1490,6 +1513,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
*fp = schHandleDropTableCallback; *fp = schHandleDropTableCallback;
break; break;
case TDMT_VND_ALTER_TABLE:
*fp = schHandleAlterTableCallback;
break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback; *fp = schHandleSubmitCallback;
break; break;
...@@ -2010,6 +2036,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -2010,6 +2036,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE: case TDMT_VND_CREATE_TABLE:
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
case TDMT_VND_ALTER_TABLE:
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT: {
msgSize = pTask->msgLen; msgSize = pTask->msgLen;
msg = taosMemoryCalloc(1, msgSize); msg = taosMemoryCalloc(1, msgSize);
......
...@@ -322,6 +322,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") ...@@ -322,6 +322,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_SMA_NOT_EXIST, "SMA not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_SMA_NOT_EXIST, "SMA not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 #### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics #basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics #basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics #basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics #basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN # notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
# #
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
# #
run tsim/tmq/prepareBasicEnv-1vgrp.sim run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----# #---- global parameters start ----#
$dbName = db $dbName = db
$vgroups = 1 $vgroups = 1
$stbPrefix = stb $stbPrefix = stb
$ctbPrefix = ctb $ctbPrefix = ctb
$ntbPrefix = ntb $ntbPrefix = ntb
$stbNum = 1 $stbNum = 1
$ctbNum = 10 $ctbNum = 10
$ntbNum = 10 $ntbNum = 10
$rowsPerCtb = 10 $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----# #---- global parameters end ----#
$pullDelay = 3 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$showMsg = 1 $ifmanualcommit = 1
$showRow = 0 $showMsg = 1
$showRow = 0
sql connect
sql use $dbName sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb print == create topics from super table
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0 print == create topics from child table
sql create topic topic_ctb_all as select * from ctb0 sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0 print == create topics from normal table
sql create topic topic_ntb_all as select * from ntb0 sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then #sql show topics
# return -1 #if $rows != 9 then
#endi # return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ' #'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$cdb_index = 0 $keyList = $keyList . ,
#=============================== start consume =============================# $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
print ================ test consume from stb #$keyList = $keyList . auto.commit.interval.ms:6000
$loop_cnt = 0 #$keyList = $keyList . ,
loop_consume_diff_topic_from_stb: #$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
####################################################################################### print ========== key list: $keyList
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later $cdb_index = 0
$cdb_index = $cdb_index + 1 #=============================== start consume =============================#
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1 print ================ test consume from stb
sleep 500 $loop_cnt = 0
sql use $cdbName loop_consume_diff_topic_from_stb:
print == create consume info table and consume result table #######################################################################################
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) # clear consume info and consume result
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) #run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
sql show tables $cdb_index = $cdb_index + 1
if $rows != 2 then $cdbName = cdb . $cdb_index
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
if $loop_cnt == 0 then print == create consume info table and consume result table
print == scenario 1: topic_stb_column sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
$topicList = ' . topic_stb_column sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = $topicList . '
elif $loop_cnt == 1 then sql show tables
print == scenario 2: topic_stb_all if $rows != 2 then
$topicList = ' . topic_stb_all return -1
$topicList = $topicList . ' endi
elif $loop_cnt == 2 then #######################################################################################
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function if $loop_cnt == 0 then
$topicList = $topicList . ' print == scenario 1: topic_stb_column
else $topicList = ' . topic_stb_column
goto loop_consume_diff_topic_from_stb_end $topicList = $topicList . '
endi elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$consumerId = 0 $topicList = ' . topic_stb_all
$totalMsgOfStb = $ctbNum * $rowsPerCtb $topicList = $topicList . '
$expectmsgcnt = $totalMsgOfStb elif $loop_cnt == 2 then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
print == start consumer to pull msgs from stb $topicList = $topicList . '
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start else
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start goto loop_consume_diff_topic_from_stb_end
endi
print == check consume result
wait_consumer_end_from_stb: $consumerId = 0
sql select * from consumeresult $totalMsgOfStb = $ctbNum * $rowsPerCtb
print ==> rows: $rows $expectmsgcnt = $totalMsgOfStb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from stb
goto wait_consumer_end_from_stb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result
endi wait_consumer_end_from_stb:
if $data[0][2] != $expectmsgcnt then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $expectmsgcnt then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_stb
$loop_cnt = $loop_cnt + 1 endi
goto loop_consume_diff_topic_from_stb if $data[0][1] != $consumerId then
loop_consume_diff_topic_from_stb_end: return -1
endi
print ================ test consume from ctb if $data[0][2] != $expectmsgcnt then
$loop_cnt = 0 return -1
loop_consume_diff_topic_from_ctb: endi
if $data[0][3] != $expectmsgcnt then
####################################################################################### return -1
# clear consume info and consume result endi
#run tsim/tmq/clearConsume.sim $loop_cnt = $loop_cnt + 1
# because drop table function no stable, so by create new db for consume info and result. Modify it later goto loop_consume_diff_topic_from_stb
$cdb_index = $cdb_index + 1 loop_consume_diff_topic_from_stb_end:
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1 print ================ test consume from ctb
sleep 500 $loop_cnt = 0
sql use $cdbName loop_consume_diff_topic_from_ctb:
print == create consume info table and consume result table #######################################################################################
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) # clear consume info and consume result
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) #run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
sql show tables $cdb_index = $cdb_index + 1
if $rows != 2 then $cdbName = cdb . $cdb_index
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
if $loop_cnt == 0 then print == create consume info table and consume result table
print == scenario 1: topic_ctb_column sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
$topicList = ' . topic_ctb_column sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = $topicList . '
elif $loop_cnt == 1 then sql show tables
print == scenario 2: topic_ctb_all if $rows != 2 then
$topicList = ' . topic_ctb_all return -1
$topicList = $topicList . ' endi
elif $loop_cnt == 2 then #######################################################################################
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function if $loop_cnt == 0 then
$topicList = $topicList . ' print == scenario 1: topic_ctb_column
else $topicList = ' . topic_ctb_column
goto loop_consume_diff_topic_from_ctb_end $topicList = $topicList . '
endi elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$consumerId = 0 $topicList = ' . topic_ctb_all
$totalMsgOfCtb = $rowsPerCtb $topicList = $topicList . '
$expectmsgcnt = $totalMsgOfCtb elif $loop_cnt == 2 then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
print == start consumer to pull msgs from ctb $topicList = $topicList . '
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start else
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start goto loop_consume_diff_topic_from_ctb_end
endi
print == check consume result
wait_consumer_end_from_ctb: $consumerId = 0
sql select * from consumeresult $totalMsgOfCtb = $rowsPerCtb
print ==> rows: $rows $expectmsgcnt = $totalMsgOfCtb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from ctb
goto wait_consumer_end_from_ctb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result
endi wait_consumer_end_from_ctb:
if $data[0][2] != $totalMsgOfCtb then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $totalMsgOfCtb then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_ctb
$loop_cnt = $loop_cnt + 1 endi
goto loop_consume_diff_topic_from_ctb if $data[0][1] != $consumerId then
loop_consume_diff_topic_from_ctb_end: return -1
endi
print ================ test consume from ntb if $data[0][2] != $totalMsgOfCtb then
$loop_cnt = 0 return -1
loop_consume_diff_topic_from_ntb: endi
if $data[0][3] != $totalMsgOfCtb then
####################################################################################### return -1
# clear consume info and consume result endi
#run tsim/tmq/clearConsume.sim $loop_cnt = $loop_cnt + 1
# because drop table function no stable, so by create new db for consume info and result. Modify it later goto loop_consume_diff_topic_from_ctb
$cdb_index = $cdb_index + 1 loop_consume_diff_topic_from_ctb_end:
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1 print ================ test consume from ntb
sleep 500 $loop_cnt = 0
sql use $cdbName loop_consume_diff_topic_from_ntb:
print == create consume info table and consume result table #######################################################################################
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) # clear consume info and consume result
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) #run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
sql show tables $cdb_index = $cdb_index + 1
if $rows != 2 then $cdbName = cdb . $cdb_index
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
if $loop_cnt == 0 then print == create consume info table and consume result table
print == scenario 1: topic_ntb_column sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
$topicList = ' . topic_ntb_column sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = $topicList . '
elif $loop_cnt == 1 then sql show tables
print == scenario 2: topic_ntb_all if $rows != 2 then
$topicList = ' . topic_ntb_all return -1
$topicList = $topicList . ' endi
elif $loop_cnt == 2 then #######################################################################################
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function if $loop_cnt == 0 then
$topicList = $topicList . ' print == scenario 1: topic_ntb_column
else $topicList = ' . topic_ntb_column
goto loop_consume_diff_topic_from_ntb_end $topicList = $topicList . '
endi elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$consumerId = 0 $topicList = ' . topic_ntb_all
$totalMsgOfNtb = $rowsPerCtb $topicList = $topicList . '
$expectmsgcnt = $totalMsgOfNtb elif $loop_cnt == 2 then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
print == start consumer to pull msgs from ntb $topicList = $topicList . '
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start else
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start goto loop_consume_diff_topic_from_ntb_end
endi
print == check consume result from ntb
wait_consumer_end_from_ntb: $consumerId = 0
sql select * from consumeresult $totalMsgOfNtb = $rowsPerCtb
print ==> rows: $rows $expectmsgcnt = $totalMsgOfNtb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from ntb
goto wait_consumer_end_from_ntb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result from ntb
endi wait_consumer_end_from_ntb:
if $data[0][2] != $totalMsgOfNtb then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $totalMsgOfNtb then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_ntb
$loop_cnt = $loop_cnt + 1 endi
goto loop_consume_diff_topic_from_ntb if $data[0][1] != $consumerId then
loop_consume_diff_topic_from_ntb_end: return -1
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg if $data[0][2] != $totalMsgOfNtb then
#system tsim/tmq/consume.sh -s stop -x SIGINT return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 #### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics #basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics #basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics #basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics #basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN # notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
# #
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
# #
run tsim/tmq/prepareBasicEnv-1vgrp.sim run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----# #---- global parameters start ----#
$dbName = db $dbName = db
$vgroups = 1 $vgroups = 1
$stbPrefix = stb $stbPrefix = stb
$ctbPrefix = ctb $ctbPrefix = ctb
$ntbPrefix = ntb $ntbPrefix = ntb
$stbNum = 1 $stbNum = 1
$ctbNum = 10 $ctbNum = 10
$ntbNum = 10 $ntbNum = 10
$rowsPerCtb = 10 $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----# #---- global parameters end ----#
$pullDelay = 3 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$showMsg = 1 $ifmanualcommit = 1
$showRow = 0 $showMsg = 1
$showRow = 0
sql connect
sql use $dbName sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb print == create topics from super table
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0 print == create topics from child table
sql create topic topic_ctb_all as select * from ctb0 sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0 print == create topics from normal table
sql create topic topic_ntb_all as select * from ntb0 sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then #sql show topics
# return -1 #if $rows != 9 then
#endi # return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ' #'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$topicNum = 3 $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#=============================== start consume =============================# #$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
print ================ test consume from stb #$keyList = $keyList . auto.offset.reset:earliest
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function $keyList = $keyList . '
$topicList = ' . topic_stb_column print ========== key list: $keyList
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . , $topicNum = 3
$topicList = $topicList . topic_stb_function
$topicList = $topicList . ' #=============================== start consume =============================#
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb print ================ test consume from stb
$totalMsgOfStb = $totalMsgOfStb * $topicNum print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
$expectmsgcnt = $totalMsgOfStb $topicList = ' . topic_stb_column
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) $topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
print == start consumer to pull msgs from stb $topicList = $topicList . ,
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start $topicList = $topicList . topic_stb_function
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start $topicList = $topicList . '
print == check consume result $consumerId = 0
wait_consumer_end_from_stb: $totalMsgOfStb = $ctbNum * $rowsPerCtb
sql select * from consumeresult $totalMsgOfStb = $totalMsgOfStb * $topicNum
print ==> rows: $rows $expectmsgcnt = $totalMsgOfStb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from stb
goto wait_consumer_end_from_stb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result
endi wait_consumer_end_from_stb:
if $data[0][2] != $expectmsgcnt then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $expectmsgcnt then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_stb
endi
####################################################################################### if $data[0][1] != $consumerId then
# clear consume info and consume result return -1
#run tsim/tmq/clearConsume.sim endi
# because drop table function no stable, so by create new db for consume info and result. Modify it later if $data[0][2] != $expectmsgcnt then
$cdbName = cdb1 return -1
sql create database $cdbName vgroups 1 endi
sleep 500 if $data[0][3] != $expectmsgcnt then
sql use $cdbName return -1
endi
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) #######################################################################################
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) # clear consume info and consume result
#run tsim/tmq/clearConsume.sim
sql show tables # because drop table function no stable, so by create new db for consume info and result. Modify it later
if $rows != 2 then $cdbName = cdb1
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
print == create consume info table and consume result table
print ================ test consume from ctb sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = ' . topic_ctb_column
$topicList = $topicList . , sql show tables
$topicList = $topicList . topic_ctb_all if $rows != 2 then
$topicList = $topicList . , return -1
$topicList = $topicList . topic_ctb_function endi
$topicList = $topicList . ' #######################################################################################
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum print ================ test consume from ctb
$expectmsgcnt = $totalMsgOfCtb print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) $topicList = ' . topic_ctb_column
$topicList = $topicList . ,
print == start consumer to pull msgs from ctb $topicList = $topicList . topic_ctb_all
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start $topicList = $topicList . ,
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start $topicList = $topicList . topic_ctb_function
$topicList = $topicList . '
print == check consume result
wait_consumer_end_from_ctb: $consumerId = 0
sql select * from consumeresult $totalMsgOfCtb = $rowsPerCtb * $topicNum
print ==> rows: $rows $expectmsgcnt = $totalMsgOfCtb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from ctb
goto wait_consumer_end_from_ctb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result
endi wait_consumer_end_from_ctb:
if $data[0][2] != $totalMsgOfCtb then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $totalMsgOfCtb then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_ctb
endi
####################################################################################### if $data[0][1] != $consumerId then
# clear consume info and consume result return -1
#run tsim/tmq/clearConsume.sim endi
# because drop table function no stable, so by create new db for consume info and result. Modify it later if $data[0][2] != $totalMsgOfCtb then
$cdbName = cdb2 return -1
sql create database $cdbName vgroups 1 endi
sleep 500 if $data[0][3] != $totalMsgOfCtb then
sql use $cdbName return -1
endi
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) #######################################################################################
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) # clear consume info and consume result
#run tsim/tmq/clearConsume.sim
sql show tables # because drop table function no stable, so by create new db for consume info and result. Modify it later
if $rows != 2 then $cdbName = cdb2
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
print == create consume info table and consume result table
print ================ test consume from ntb sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = ' . topic_ntb_column
$topicList = $topicList . , sql show tables
$topicList = $topicList . topic_ntb_all if $rows != 2 then
$topicList = $topicList . , return -1
$topicList = $topicList . topic_ntb_function endi
$topicList = $topicList . ' #######################################################################################
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum print ================ test consume from ntb
$expectmsgcnt = $totalMsgOfNtb print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) $topicList = ' . topic_ntb_column
$topicList = $topicList . ,
print == start consumer to pull msgs from ntb $topicList = $topicList . topic_ntb_all
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start $topicList = $topicList . ,
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start $topicList = $topicList . topic_ntb_function
$topicList = $topicList . '
print == check consume result from ntb
wait_consumer_end_from_ntb: $consumerId = 0
sql select * from consumeresult $totalMsgOfNtb = $rowsPerCtb * $topicNum
print ==> rows: $rows $expectmsgcnt = $totalMsgOfNtb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from ntb
goto wait_consumer_end_from_ntb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result from ntb
endi wait_consumer_end_from_ntb:
if $data[0][2] != $totalMsgOfNtb then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $totalMsgOfNtb then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_ntb
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg if $data[0][1] != $consumerId then
#system tsim/tmq/consume.sh -s stop -x SIGINT return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000 ...@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 5 $pullDelay = 5
$ifcheckdata = 1 $ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1 $showMsg = 1
$showRow = 0 $showRow = 0
...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 ...@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1 # return -1
#endi #endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1 $keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ' $keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 2 $topicNum = 2
...@@ -72,7 +81,7 @@ $consumerId = 0 ...@@ -72,7 +81,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb $totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum $totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_stb_all $topicList = ' . topic_stb_all
...@@ -80,7 +89,7 @@ $topicList = $topicList . , ...@@ -80,7 +89,7 @@ $topicList = $topicList . ,
$topicList = $topicList . topic_stb_function $topicList = $topicList . topic_stb_function
$topicList = $topicList . ' $topicList = $topicList . '
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
...@@ -158,7 +167,7 @@ sleep 500 ...@@ -158,7 +167,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ctb print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -179,14 +188,14 @@ $consumerId = 0 ...@@ -179,14 +188,14 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ctb_function $topicList = ' . topic_ctb_function
$topicList = $topicList . , $topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all $topicList = $topicList . topic_ctb_all
$topicList = $topicList . ' $topicList = $topicList . '
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
...@@ -249,7 +258,7 @@ sleep 500 ...@@ -249,7 +258,7 @@ sleep 500
sql use $cdbName sql use $cdbName
print == create consume info table and consume result table for ntb print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables sql show tables
...@@ -270,7 +279,7 @@ $consumerId = 0 ...@@ -270,7 +279,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ntb_function $topicList = ' . topic_ntb_function
...@@ -278,7 +287,7 @@ $topicList = $topicList . , ...@@ -278,7 +287,7 @@ $topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all $topicList = $topicList . topic_ntb_all
$topicList = $topicList . ' $topicList = $topicList . '
$consumerId = 1 $consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 #### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics #basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics #basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics #basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics #basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN # notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
# #
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
# #
run tsim/tmq/prepareBasicEnv-4vgrp.sim run tsim/tmq/prepareBasicEnv-4vgrp.sim
#---- global parameters start ----# #---- global parameters start ----#
$dbName = db $dbName = db
$vgroups = 4 $vgroups = 4
$stbPrefix = stb $stbPrefix = stb
$ctbPrefix = ctb $ctbPrefix = ctb
$ntbPrefix = ntb $ntbPrefix = ntb
$stbNum = 1 $stbNum = 1
$ctbNum = 10 $ctbNum = 10
$ntbNum = 10 $ntbNum = 10
$rowsPerCtb = 10 $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----# #---- global parameters end ----#
$pullDelay = 3 $pullDelay = 3
$ifcheckdata = 1 $ifcheckdata = 1
$showMsg = 1 $ifmanualcommit = 1
$showRow = 0 $showMsg = 1
$showRow = 0
sql connect
sql use $dbName sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb print == create topics from super table
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0 print == create topics from child table
sql create topic topic_ctb_all as select * from ctb0 sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0 print == create topics from normal table
sql create topic topic_ntb_all as select * from ntb0 sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then #sql show topics
# return -1 #if $rows != 9 then
#endi # return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ' #'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$cdb_index = 0 $keyList = $keyList . ,
#=============================== start consume =============================# $keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
print ================ test consume from stb #$keyList = $keyList . auto.commit.interval.ms:6000
$loop_cnt = 0 #$keyList = $keyList . ,
loop_consume_diff_topic_from_stb: #$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
####################################################################################### print ========== key list: $keyList
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later $cdb_index = 0
$cdb_index = $cdb_index + 1 #=============================== start consume =============================#
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1 print ================ test consume from stb
sleep 500 $loop_cnt = 0
sql use $cdbName loop_consume_diff_topic_from_stb:
print == create consume info table and consume result table #######################################################################################
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) # clear consume info and consume result
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) #run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
sql show tables $cdb_index = $cdb_index + 1
if $rows != 2 then $cdbName = cdb . $cdb_index
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
if $loop_cnt == 0 then print == create consume info table and consume result table
print == scenario 1: topic_stb_column sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
$topicList = ' . topic_stb_column sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = $topicList . '
elif $loop_cnt == 1 then sql show tables
print == scenario 2: topic_stb_all if $rows != 2 then
$topicList = ' . topic_stb_all return -1
$topicList = $topicList . ' endi
elif $loop_cnt == 2 then #######################################################################################
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function if $loop_cnt == 0 then
$topicList = $topicList . ' print == scenario 1: topic_stb_column
else $topicList = ' . topic_stb_column
goto loop_consume_diff_topic_from_stb_end $topicList = $topicList . '
endi elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$consumerId = 0 $topicList = ' . topic_stb_all
$totalMsgOfStb = $ctbNum * $rowsPerCtb $topicList = $topicList . '
$expectmsgcnt = $totalMsgOfStb elif $loop_cnt == 2 then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
print == start consumer to pull msgs from stb $topicList = $topicList . '
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start else
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start goto loop_consume_diff_topic_from_stb_end
endi
print == check consume result
wait_consumer_end_from_stb: $consumerId = 0
sql select * from consumeresult $totalMsgOfStb = $ctbNum * $rowsPerCtb
print ==> rows: $rows $expectmsgcnt = $totalMsgOfStb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from stb
goto wait_consumer_end_from_stb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result
endi wait_consumer_end_from_stb:
if $data[0][2] != $expectmsgcnt then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $expectmsgcnt then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_stb
$loop_cnt = $loop_cnt + 1 endi
goto loop_consume_diff_topic_from_stb if $data[0][1] != $consumerId then
loop_consume_diff_topic_from_stb_end: return -1
endi
print ================ test consume from ctb if $data[0][2] != $expectmsgcnt then
$loop_cnt = 0 return -1
loop_consume_diff_topic_from_ctb: endi
if $data[0][3] != $expectmsgcnt then
####################################################################################### return -1
# clear consume info and consume result endi
#run tsim/tmq/clearConsume.sim $loop_cnt = $loop_cnt + 1
# because drop table function no stable, so by create new db for consume info and result. Modify it later goto loop_consume_diff_topic_from_stb
$cdb_index = $cdb_index + 1 loop_consume_diff_topic_from_stb_end:
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1 print ================ test consume from ctb
sleep 500 $loop_cnt = 0
sql use $cdbName loop_consume_diff_topic_from_ctb:
print == create consume info table and consume result table #######################################################################################
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) # clear consume info and consume result
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) #run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
sql show tables $cdb_index = $cdb_index + 1
if $rows != 2 then $cdbName = cdb . $cdb_index
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
if $loop_cnt == 0 then print == create consume info table and consume result table
print == scenario 1: topic_ctb_column sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
$topicList = ' . topic_ctb_column sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = $topicList . '
elif $loop_cnt == 1 then sql show tables
print == scenario 2: topic_ctb_all if $rows != 2 then
$topicList = ' . topic_ctb_all return -1
$topicList = $topicList . ' endi
elif $loop_cnt == 2 then #######################################################################################
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function if $loop_cnt == 0 then
$topicList = $topicList . ' print == scenario 1: topic_ctb_column
else $topicList = ' . topic_ctb_column
goto loop_consume_diff_topic_from_ctb_end $topicList = $topicList . '
endi elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$consumerId = 0 $topicList = ' . topic_ctb_all
$totalMsgOfCtb = $rowsPerCtb $topicList = $topicList . '
$expectmsgcnt = $totalMsgOfCtb elif $loop_cnt == 2 then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
print == start consumer to pull msgs from ctb $topicList = $topicList . '
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start else
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start goto loop_consume_diff_topic_from_ctb_end
endi
print == check consume result
wait_consumer_end_from_ctb: $consumerId = 0
sql select * from consumeresult $totalMsgOfCtb = $rowsPerCtb
print ==> rows: $rows $expectmsgcnt = $totalMsgOfCtb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from ctb
goto wait_consumer_end_from_ctb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result
endi wait_consumer_end_from_ctb:
if $data[0][2] != $totalMsgOfCtb then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $totalMsgOfCtb then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_ctb
$loop_cnt = $loop_cnt + 1 endi
goto loop_consume_diff_topic_from_ctb if $data[0][1] != $consumerId then
loop_consume_diff_topic_from_ctb_end: return -1
endi
print ================ test consume from ntb if $data[0][2] != $totalMsgOfCtb then
$loop_cnt = 0 return -1
loop_consume_diff_topic_from_ntb: endi
if $data[0][3] != $totalMsgOfCtb then
####################################################################################### return -1
# clear consume info and consume result endi
#run tsim/tmq/clearConsume.sim $loop_cnt = $loop_cnt + 1
# because drop table function no stable, so by create new db for consume info and result. Modify it later goto loop_consume_diff_topic_from_ctb
$cdb_index = $cdb_index + 1 loop_consume_diff_topic_from_ctb_end:
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1 print ================ test consume from ntb
sleep 500 $loop_cnt = 0
sql use $cdbName loop_consume_diff_topic_from_ntb:
print == create consume info table and consume result table #######################################################################################
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) # clear consume info and consume result
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) #run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
sql show tables $cdb_index = $cdb_index + 1
if $rows != 2 then $cdbName = cdb . $cdb_index
return -1 sql create database $cdbName vgroups 1
endi sleep 500
####################################################################################### sql use $cdbName
if $loop_cnt == 0 then print == create consume info table and consume result table
print == scenario 1: topic_ntb_column sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
$topicList = ' . topic_ntb_column sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
$topicList = $topicList . '
elif $loop_cnt == 1 then sql show tables
print == scenario 2: topic_ntb_all if $rows != 2 then
$topicList = ' . topic_ntb_all return -1
$topicList = $topicList . ' endi
elif $loop_cnt == 2 then #######################################################################################
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function if $loop_cnt == 0 then
$topicList = $topicList . ' print == scenario 1: topic_ntb_column
else $topicList = ' . topic_ntb_column
goto loop_consume_diff_topic_from_ntb_end $topicList = $topicList . '
endi elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$consumerId = 0 $topicList = ' . topic_ntb_all
$totalMsgOfNtb = $rowsPerCtb $topicList = $topicList . '
$expectmsgcnt = $totalMsgOfNtb elif $loop_cnt == 2 then
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
print == start consumer to pull msgs from ntb $topicList = $topicList . '
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start else
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start goto loop_consume_diff_topic_from_ntb_end
endi
print == check consume result from ntb
wait_consumer_end_from_ntb: $consumerId = 0
sql select * from consumeresult $totalMsgOfNtb = $rowsPerCtb
print ==> rows: $rows $expectmsgcnt = $totalMsgOfNtb
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
if $rows != 1 then
sleep 1000 print == start consumer to pull msgs from ntb
goto wait_consumer_end_from_ntb print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
endi system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
if $data[0][1] != $consumerId then
return -1 print == check consume result from ntb
endi wait_consumer_end_from_ntb:
if $data[0][2] != $totalMsgOfNtb then sql select * from consumeresult
return -1 print ==> rows: $rows
endi print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $data[0][3] != $totalMsgOfNtb then if $rows != 1 then
return -1 sleep 1000
endi goto wait_consumer_end_from_ntb
$loop_cnt = $loop_cnt + 1 endi
goto loop_consume_diff_topic_from_ntb if $data[0][1] != $consumerId then
loop_consume_diff_topic_from_ntb_end: return -1
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg if $data[0][2] != $totalMsgOfNtb then
#system tsim/tmq/consume.sh -s stop -x SIGINT return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
此差异已折叠。
此差异已折叠。
...@@ -22,18 +22,6 @@ class TDTestCase: ...@@ -22,18 +22,6 @@ class TDTestCase:
tdSql.init(conn.cursor(), logSql) tdSql.init(conn.cursor(), logSql)
def insertData(self, tb_name): def insertData(self, tb_name):
# insert_sql_list = [f'insert into {tb_name} values ("2021-01-01 12:00:00", 1, 1, 1, 3, 1.1, 1.1, "binary", "nchar", true, 1)',
# f'insert into {tb_name} values ("2021-01-05 12:00:00", 2, 2, 1, 3, 1.1, 1.1, "binary", "nchar", true, 2)',
# f'insert into {tb_name} values ("2021-01-07 12:00:00", 1, 3, 1, 2, 1.1, 1.1, "binary", "nchar", true, 3)',
# f'insert into {tb_name} values ("2021-01-09 12:00:00", 1, 2, 4, 3, 1.1, 1.1, "binary", "nchar", true, 4)',
# f'insert into {tb_name} values ("2021-01-11 12:00:00", 1, 2, 5, 5, 1.1, 1.1, "binary", "nchar", true, 5)',
# f'insert into {tb_name} values ("2021-01-13 12:00:00", 1, 2, 1, 3, 6.6, 1.1, "binary", "nchar", true, 6)',
# f'insert into {tb_name} values ("2021-01-15 12:00:00", 1, 2, 1, 3, 1.1, 7.7, "binary", "nchar", true, 7)',
# f'insert into {tb_name} values ("2021-01-17 12:00:00", 1, 2, 1, 3, 1.1, 1.1, "binary8", "nchar", true, 8)',
# f'insert into {tb_name} values ("2021-01-19 12:00:00", 1, 2, 1, 3, 1.1, 1.1, "binary", "nchar9", true, 9)',
# f'insert into {tb_name} values ("2021-01-21 12:00:00", 1, 2, 1, 3, 1.1, 1.1, "binary", "nchar", false, 10)',
# f'insert into {tb_name} values ("2021-01-23 12:00:00", 1, 3, 1, 3, 1.1, 1.1, Null, Null, false, 11)'
# ]
insert_sql_list = [f'insert into {tb_name} values ("2021-01-01 12:00:00", 1, 1, 1, 3, 1.1, 1.1, "binary", "nchar", true, 1, 2, 3, 4)', insert_sql_list = [f'insert into {tb_name} values ("2021-01-01 12:00:00", 1, 1, 1, 3, 1.1, 1.1, "binary", "nchar", true, 1, 2, 3, 4)',
f'insert into {tb_name} values ("2021-01-05 12:00:00", 2, 2, 1, 3, 1.1, 1.1, "binary", "nchar", true, 2, 3, 4, 5)', f'insert into {tb_name} values ("2021-01-05 12:00:00", 2, 2, 1, 3, 1.1, 1.1, "binary", "nchar", true, 2, 3, 4, 5)',
f'insert into {tb_name} values ("2021-01-07 12:00:00", 1, 3, 1, 2, 1.1, 1.1, "binary", "nchar", true, 3, 4, 5, 6)', f'insert into {tb_name} values ("2021-01-07 12:00:00", 1, 3, 1, 2, 1.1, 1.1, "binary", "nchar", true, 3, 4, 5, 6)',
...@@ -54,7 +42,6 @@ class TDTestCase: ...@@ -54,7 +42,6 @@ class TDTestCase:
tb_name = tdCom.getLongName(8, "letters") tb_name = tdCom.getLongName(8, "letters")
tdSql.execute( tdSql.execute(
f"CREATE TABLE {tb_name} (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 float, c6 double, c7 binary(100), c8 nchar(200), c9 bool, c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned)") f"CREATE TABLE {tb_name} (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 float, c6 double, c7 binary(100), c8 nchar(200), c9 bool, c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned)")
# f"CREATE TABLE {tb_name} (ts timestamp, c1 tinyint, c2 smallint, c3 int, c4 bigint, c5 float, c6 double, c7 binary(100), c8 nchar(200), c9 bool, c10 int)")
self.insertData(tb_name) self.insertData(tb_name)
return tb_name return tb_name
...@@ -95,6 +82,31 @@ class TDTestCase: ...@@ -95,6 +82,31 @@ class TDTestCase:
def queryTsCol(self, tb_name, check_elm=None): def queryTsCol(self, tb_name, check_elm=None):
select_elm = "*" if check_elm is None else check_elm select_elm = "*" if check_elm is None else check_elm
# ts in
query_sql = f'select {select_elm} from {tb_name} where ts in ("2021-01-11 12:00:00", "2021-01-13 12:00:00")'
tdSql.query(query_sql)
tdSql.checkRows(2)
tdSql.checkEqual(self.queryLastC10(query_sql), 6) if select_elm == "*" else False
# ts not in
query_sql = f'select {select_elm} from {tb_name} where ts not in ("2021-01-11 12:00:00", "2021-01-13 12:00:00")'
tdSql.query(query_sql)
tdSql.checkRows(9)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
# ts not null
query_sql = f'select {select_elm} from {tb_name} where ts is not Null'
tdSql.query(query_sql)
tdSql.checkRows(11)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
# ts null
query_sql = f'select {select_elm} from {tb_name} where ts is Null'
tdSql.query(query_sql)
tdSql.checkRows(0)
# not support like not like match nmatch
tdSql.error(f'select {select_elm} from {tb_name} where ts like ("2021-01-11 12:00:00%")')
tdSql.error(f'select {select_elm} from {tb_name} where ts not like ("2021-01-11 12:00:0_")')
tdSql.error(f'select {select_elm} from {tb_name} where ts match "2021-01-11 12:00:00%"')
tdSql.error(f'select {select_elm} from {tb_name} where ts nmatch "2021-01-11 12:00:00%"')
# ts and ts # ts and ts
query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"' query_sql = f'select {select_elm} from {tb_name} where ts > "2021-01-11 12:00:00" or ts < "2021-01-13 12:00:00"'
tdSql.query(query_sql) tdSql.query(query_sql)
...@@ -1422,9 +1434,9 @@ class TDTestCase: ...@@ -1422,9 +1434,9 @@ class TDTestCase:
tdSql.query(query_sql) tdSql.query(query_sql)
tdSql.checkRows(11) tdSql.checkRows(11)
tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False tdSql.checkEqual(self.queryLastC10(query_sql), 11) if select_elm == "*" else False
query_sql = f'select c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from {tb_name} where c9 > "binary" and c9 >= "binary8" or c9 < "binary9" and c9 <= "binary" and c9 != 2 and c9 <> 2 and c9 = 4 or c9 is not null and c9 between 2 and 4 and c9 not between 1 and 2 and c9 in (2,4) and c9 not in (1,2) or c9 match "binary[28]" or c9 nmatch "binary"' query_sql = f'select c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from {tb_name} where c9 > "binary" and c9 >= "binary8" or c9 < "binary9" and c9 <= "binary" and c9 != 2 and c9 <> 2 and c9 = 4 or c9 is not null and c9 between 2 and 4 and c9 not between 1 and 2 and c9 in (2,4) and c9 not in (1,2)'
tdSql.query(query_sql) tdSql.query(query_sql)
tdSql.checkRows(11) tdSql.checkRows(9)
def queryFullColType(self, tb_name, check_elm=None): def queryFullColType(self, tb_name, check_elm=None):
select_elm = "*" if check_elm is None else check_elm select_elm = "*" if check_elm is None else check_elm
......
此差异已折叠。
此差异已折叠。
...@@ -53,6 +53,7 @@ python3 ./test.py -f 2-query/tan.py ...@@ -53,6 +53,7 @@ python3 ./test.py -f 2-query/tan.py
python3 ./test.py -f 2-query/arcsin.py python3 ./test.py -f 2-query/arcsin.py
python3 ./test.py -f 2-query/arccos.py python3 ./test.py -f 2-query/arccos.py
python3 ./test.py -f 2-query/arctan.py python3 ./test.py -f 2-query/arctan.py
# python3 ./test.py -f 2-query/query_cols_tags_and_or.py python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 2-query/nestedQuery.py
python3 ./test.py -f 7-tmq/basic5.py python3 ./test.py -f 7-tmq/basic5.py
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册