提交 4150eb32 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

......@@ -14,25 +14,6 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS "${TD_SOURCE_DIR}/.git")
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND cd ${TD_SOURCE_DIR} && ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
message(WARNING "git submodule update --init --recursive failed with ${GIT_SUBMOD_RESULT}, please checkout submodules")
endif()
endif()
endif()
if(NOT EXISTS "${TD_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.")
endif()
if (NOT DEFINED TD_GRANT)
SET(TD_GRANT FALSE)
endif()
......
......@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX };
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX };
enum {
HEARTBEAT_KEY_USER_AUTHINFO = 1,
......
......@@ -642,6 +642,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_INTERNAL_PK TAOS_DEF_ERROR_CODE(0, 0x2646)
#define TSDB_CODE_PAR_INVALID_TIMELINE_FUNC TAOS_DEF_ERROR_CODE(0, 0x2647)
#define TSDB_CODE_PAR_INVALID_PASSWD TAOS_DEF_ERROR_CODE(0, 0x2648)
#define TSDB_CODE_PAR_INVALID_ALTER_TABLE TAOS_DEF_ERROR_CODE(0, 0x2649)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -1007,8 +1007,6 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump
while(*sql != '\0') {
// parse value
if (*sql == SPACE) {
valueLen = sql - value;
sql++;
break;
}
if (*sql == EQUAL) {
......@@ -1017,9 +1015,7 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump
}
sql++;
}
if(valueLen == 0){
valueLen = sql - value;
}
valueLen = sql - value;
if(valueLen == 0){
smlBuildInvalidDataMsg(msg, "invalid value", value);
......@@ -1365,7 +1361,7 @@ static void smlDestroySTableMeta(SSmlSTableMeta *meta){
static void smlDestroyCols(SArray *cols) {
if (!cols) return;
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
void *kv = taosArrayGet(cols, i);
void *kv = taosArrayGetP(cols, i);
taosMemoryFree(kv);
}
}
......@@ -2077,12 +2073,16 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) {
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseTelnetLine failed", info->id);
smlDestroyTableInfo(info, tinfo);
smlDestroyCols(cols);
taosArrayDestroy(cols);
return ret;
}
if(taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
smlDestroyTableInfo(info, tinfo);
smlDestroyCols(cols);
taosArrayDestroy(cols);
return TSDB_CODE_SML_INVALID_DATA;
}
taosHashClear(info->dumplicateKey);
......
......@@ -516,8 +516,8 @@ TEST(testCase, smlProcess_influx_Test) {
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d");
ASSERT_NE(res, nullptr);
// TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d");
// ASSERT_NE(res, nullptr);
// int fieldNum = taos_field_count(res);
// ASSERT_EQ(fieldNum, 5);
// int rowNum = taos_affected_rows(res);
......@@ -525,7 +525,7 @@ TEST(testCase, smlProcess_influx_Test) {
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res);
// }
taos_free_result(res);
// taos_free_result(res);
destroyRequest(request);
smlDestroyInfo(info);
}
......@@ -605,8 +605,8 @@ TEST(testCase, smlProcess_telnet_Test) {
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a");
ASSERT_NE(res, nullptr);
// TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a");
// ASSERT_NE(res, nullptr);
// int fieldNum = taos_field_count(res);
// ASSERT_EQ(fieldNum, 2);
// int rowNum = taos_affected_rows(res);
......@@ -614,7 +614,7 @@ TEST(testCase, smlProcess_telnet_Test) {
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res);
// }
taos_free_result(res);
// taos_free_result(res);
// res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961");
// ASSERT_NE(res, nullptr);
......@@ -670,16 +670,16 @@ TEST(testCase, smlProcess_json1_Test) {
int ret = smlProcess(info, (char **)(&sql), -1);
ASSERT_EQ(ret, 0);
TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7");
ASSERT_NE(res, nullptr);
int fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 2);
// TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7");
// ASSERT_NE(res, nullptr);
// int fieldNum = taos_field_count(res);
// ASSERT_EQ(fieldNum, 2);
// int rowNum = taos_affected_rows(res);
// ASSERT_EQ(rowNum, 1);
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res);
// }
taos_free_result(res);
// taos_free_result(res);
destroyRequest(request);
smlDestroyInfo(info);
}
......
......@@ -4217,7 +4217,6 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1;
......
......@@ -214,7 +214,23 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
// todo
SRpcMsg *pRsp = NULL;
(void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
if (ret != 0) {
// if leader, send response
if (pMsg->rpcMsg.handle != NULL) {
SRpcMsg rsp;
rsp.pCont = NULL;
rsp.contLen = 0;
rsp.code = terrno;
dTrace("vmProcessSyncQueue error, code:%d", terrno);
rsp.ahandle = pMsg->rpcMsg.ahandle;
rsp.handle = pMsg->rpcMsg.handle;
rsp.refId = pMsg->rpcMsg.refId;
tmsgSendRsp(&rsp);
}
}
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
......
......@@ -198,6 +198,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
}
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
......@@ -219,67 +221,70 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnPingCb(pSyncNode, pSyncMsg);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
return 0;
return ret;
}
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
......
......@@ -2576,12 +2576,6 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == metaCache) {
qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
if (code) {
if (HASH_NODE_EXIST(code)) {
......
......@@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
......
......@@ -2055,6 +2055,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pSrc->pData == NULL) {
continue;
}
int32_t numOfRows = 0;
for (int32_t j = 0; j < totalRows; ++j) {
if (rowRes[j] == 0) {
......@@ -4372,9 +4377,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
goto _error;
}
pInfo->limit = *pLimit;
pInfo->slimit = *pSlimit;
pInfo->curOffset = pLimit->offset;
pInfo->limit = *pLimit;
pInfo->slimit = *pSlimit;
pInfo->curOffset = pLimit->offset;
pInfo->curSOffset = pSlimit->offset;
pInfo->binfo.pRes = pResBlock;
......
......@@ -291,20 +291,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
// this is to handle the tbname
if (fmIsScanPseudoColumnFunc(functionId)) {
struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
SScalarParam srcParam = {
.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
......@@ -316,6 +303,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
metaReaderClear(&mr);
}
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
struct SScalarFuncExecFuncs fpSet = {0};
fmGetScalarFuncExecFuncs(functionId, &fpSet);
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
infoData.info.bytes = sizeof(uint64_t);
colInfoDataEnsureCapacity(&infoData, 0, 1);
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
SScalarParam srcParam = {
.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
......
......@@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t sampleFunction(SqlFunctionCtx* pCtx);
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t tailFunction(SqlFunctionCtx* pCtx);
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......
......@@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS;
}
static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams && 3 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The input parameter of TAIL function can only be column");
}
for (int32_t i = 1; i < numOfParams; ++i) {
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
uint8_t colType = pCol->resType.type;
if (IS_VAR_DATA_TYPE(colType)) {
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
} else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo
return TSDB_CODE_SUCCESS;
......@@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = sampleFunction,
.finalizeFunc = NULL
},
{
.name = "tail",
.type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv,
.initFunc = tailFunctionSetup,
.processFunc = tailFunction,
.finalizeFunc = tailFinalize
},
{
.name = "abs",
.type = FUNCTION_TYPE_ABS,
......
......@@ -18,12 +18,15 @@
#include "function.h"
#include "querynodes.h"
#include "taggfunction.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tpercentile.h"
#define HISTOGRAM_MAX_BINS_NUM 1000
#define MAVG_MAX_POINTS_NUM 1000
#define SAMPLE_MAX_POINTS_NUM 1000
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
typedef struct SSumRes {
union {
......@@ -161,6 +164,21 @@ typedef struct SSampleInfo {
int64_t *timestamp;
} SSampleInfo;
typedef struct STailItem {
int64_t timestamp;
bool isNull;
char data[];
} STailItem;
typedef struct STailInfo {
int32_t numOfPoints;
int32_t numAdded;
int32_t offset;
uint8_t colType;
int16_t colBytes;
STailItem **pItems;
} STailInfo;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
......@@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (colDataIsNull_s(pInputCol, i)) {
//colDataAppendNULL(pOutput, i);
continue;
}
......@@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
//
// return pResInfo->numOfRes;
//}
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
int32_t numOfPoints = pVal->datum.i;
pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes);
return true;
}
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
pInfo->numAdded = 0;
pInfo->numOfPoints = pCtx->param[1].param.i;
if (pCtx->numOfParams == 4) {
pInfo->offset = pCtx->param[2].param.i;
} else {
pInfo->offset = 0;
}
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) ||
(pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) {
return false;
}
pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo));
char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES;
size_t unitSize = sizeof(STailItem) + pInfo->colBytes;
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize);
pInfo->pItems[i]->isNull = false;
}
return true;
}
static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts, bool isNull) {
pItem->timestamp = ts;
if (isNull) {
pItem->isNull = true;
} else {
memcpy(pItem->data, data, colBytes);
}
}
static int32_t tailCompFn(const void *p1, const void *p2, const void *param) {
STailItem *d1 = *(STailItem **)p1;
STailItem *d2 = *(STailItem **)p2;
return compareInt64Val(&d1->timestamp, &d2->timestamp);
}
static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts, bool isNull) {
STailItem **pList = pInfo->pItems;
if (pInfo->numAdded < pInfo->numOfPoints) {
tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts, isNull);
taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0);
pInfo->numAdded++;
} else if (pList[0]->timestamp < ts) {
tailAssignResult(pList[0], data, pInfo->colBytes, ts, isNull);
taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0);
}
}
int32_t tailFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
if (pInfo->offset >= pInput->numOfRows) {
return 0;
} else {
pInfo->numOfPoints = MIN(pInfo->numOfPoints, pInput->numOfRows - pInfo->offset);
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex - pInfo->offset; i += 1) {
char* data = colDataGetData(pInputCol, i);
doTailAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
}
taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
int32_t pos = startOffset + i;
STailItem *pItem = pInfo->pItems[i];
if (pItem->isNull) {
colDataAppendNULL(pOutput, pos);
} else {
colDataAppend(pOutput, pos, pItem->data, false);
}
}
return pInfo->numOfPoints;
}
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STailItem *pItem = pInfo->pItems[i];
colDataAppend(pCol, currentRow, pItem->data, false);
//setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
currentRow += 1;
}
return pEntryInfo->numOfRes;
}
......@@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN];
UdfcFuncHandle handle;
int32_t refCount;
int64_t lastRefTime;
} SUdfcFuncStub;
typedef struct SUdfcProxy {
......@@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
return 0;
}
*pHandle = NULL;
......@@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
......@@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle);
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
taosArrayPush(udfStubs, stub);
}
++i;
......
......@@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop(global.loop);
}
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
typedef enum EUdfdRpcReqRspType {
UDFD_RPC_MNODE_CONNECT = 0,
UDFD_RPC_RETRIVE_FUNC,
} EUdfdRpcReqRspType;
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void* param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
ASSERT(pMsg->ahandle != NULL);
if (pEpSet) {
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&global.mgmtEp, pEpSet);
}
}
if (pMsg->code != TSDB_CODE_SUCCESS) {
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
msgInfo->code = pMsg->code;
goto _return;
}
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
goto _return;
}
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
}
msgInfo->code = 0;
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
}
_return:
rpcFreeCont(pMsg->pCont);
uv_sem_post(&msgInfo->resultSem);
return;
}
int32_t udfdConnectToMNode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId());
connReq.startTime = htobe64(taosGetTimestampMs());
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_MND_CONNECT;
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
int32_t code = msgInfo->code;
uv_sem_destroy(&msgInfo->resultSem);
taosMemoryFree(msgInfo);
return code;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
uv_sem_destroy(&msgInfo->resultSem);
int32_t code = msgInfo->code;
taosMemoryFree(msgInfo);
return code;
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
pEpSet->version = 0;
......@@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return 0;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
SRpcMsg rpcRsp = {0};
rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp);
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
rpcFreeCont(rpcRsp.pCont);
return 0;
}
int32_t udfdOpenClientRpc() {
char *pass = "taosdata";
char *user = "root";
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit = {0};
rpcInit.label = (char *)"UDFD";
rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1;
rpcInit.cfp = udfdProcessRpcRsp;
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
rpcInit.user = (char *)user;
rpcInit.ckey = (char *)"key";
rpcInit.secret = (char *)secretEncrypt;
rpcInit.spi = 1;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
rpcInit.secret = pass;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0;
}
......@@ -700,12 +800,6 @@ static int32_t udfdRun() {
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex);
// TOOD: client rpc to fetch udf function info from mnode
if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -1;
}
if (udfdUvInit() != 0) {
fnError("uv init failure");
return -2;
......@@ -717,7 +811,6 @@ static int32_t udfdRun() {
int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
removeListeningPipe();
udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0;
......@@ -746,9 +839,22 @@ int main(int argc, char *argv[]) {
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -1;
return -2;
}
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
return udfdRun();
if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -3;
}
if (udfdConnectToMNode() != 0) {
fnError("failed to start since can not connect to mnode");
return -4;
}
udfdRun();
udfdCloseClientRpc();
}
......@@ -241,7 +241,7 @@ alter_table_clause(A) ::=
alter_table_clause(A) ::=
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
alter_table_clause(A) ::=
full_table_name(B) SET TAG column_name(C) NK_EQ literal(D). { A = createAlterTableSetTag(pCxt, B, &C, D); }
full_table_name(B) SET TAG column_name(C) NK_EQ literal(D). { A = createAlterTableSetTag(pCxt, B, &C, releaseRawExprNode(pCxt, D)); }
%type multi_create_clause { SNodeList* }
%destructor multi_create_clause { nodesDestroyList($$); }
......
......@@ -1041,18 +1041,6 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
destroyCreateSubTbReq(&pCxt->createTblReq);
}
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
if (pDataBlock == NULL) {
return;
}
taosMemoryFreeClear(pDataBlock->pData);
if (!pDataBlock->cloned) {
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
taosMemoryFreeClear(pDataBlock);
}
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyInsertParseContextForTable(pCxt);
taosHashCleanup(pCxt->pVgroupsHashObj);
......@@ -1301,6 +1289,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
CHECK_CODE(buildOutput(&insertCtx));
destroyBlockArrayList(insertCtx.pVgDataBlocks);
return TSDB_CODE_SUCCESS;
}
......@@ -1580,16 +1569,25 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields
// schemaless logic start
typedef struct SmlExecHandle {
SHashObj* pBlockHash;
typedef struct SmlExecTableHandle {
SParsedDataColInfo tags; // each table
SKVRowBuilder tagsBuilder; // each table
SVCreateTbReq createTblReq; // each table
} SmlExecTableHandle;
SQuery* pQuery;
typedef struct SmlExecHandle {
SHashObj* pBlockHash;
SmlExecTableHandle tableExecHandle;
SQuery *pQuery;
} SSmlExecHandle;
static void smlDestroyTableHandle(void* pHandle) {
SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
tdDestroyKVRowBuilder(&handle->tagsBuilder);
destroyBoundColumnInfo(&handle->tags);
destroyCreateSubTbReq(&handle->createTblReq);
}
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
col_id_t nCols = pColList->numOfCols;
......@@ -1692,25 +1690,26 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumnData(tags, &smlHandle->tags, pTagsSchema);
setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "bound tags error");
return ret;
}
SKVRow row = NULL;
ret = smlBuildTagRow(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &row, &pBuf);
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema, &row, &pBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
buildCreateTbReq(&smlHandle->createTblReq, tableName, row, pTableMeta->suid);
buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, row, pTableMeta->suid);
STableDataBlocks* pDataBlock = NULL;
ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
pTableMeta, &pDataBlock, NULL, &smlHandle->createTblReq);
pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "create data block error");
return ret;
......@@ -1826,9 +1825,7 @@ void smlDestroyHandle(void* pHandle) {
if (!pHandle) return;
SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
destroyBlockHashmap(handle->pBlockHash);
tdDestroyKVRowBuilder(&handle->tagsBuilder);
destroyBoundColumnInfo(&handle->tags);
destroyCreateSubTbReq(&handle->createTblReq);
smlDestroyTableHandle(&handle->tableExecHandle);
taosMemoryFree(handle);
}
......
......@@ -237,9 +237,7 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) {
taosMemoryFreeClear(pDataBlock->pData);
if (!pDataBlock->cloned) {
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
taosMemoryFreeClear(pDataBlock->pTableMeta);
}
taosMemoryFreeClear(pDataBlock->pTableMeta);
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
......
......@@ -2949,8 +2949,8 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p
}
static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) {
SEncoder encoder = {0};
int32_t contLen = 0;
SEncoder encoder = {0};
int32_t contLen = 0;
SVDropTSmaReq dropSmaReq = {0};
strcpy(dropSmaReq.indexName, pStmt->indexName);
......@@ -2958,7 +2958,7 @@ static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTSmaReq, &dropSmaReq, contLen, ret);
if (ret < 0) {
......@@ -3800,7 +3800,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t rewriteToVnodeModifOpStmt(SQuery* pQuery, SArray* pBufArray) {
static int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray) {
SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (pNewStmt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -3855,7 +3855,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifOpStmt(pQuery, pBufArray);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (TSDB_CODE_SUCCESS != code) {
destroyCreateTbReqArray(pBufArray);
}
......@@ -4111,7 +4111,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
return TSDB_CODE_OUT_OF_MEMORY;
}
return rewriteToVnodeModifOpStmt(pQuery, pBufArray);
return rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
}
typedef struct SVgroupDropTableBatch {
......@@ -4251,14 +4251,162 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return rewriteToVnodeModifOpStmt(pQuery, pBufArray);
return rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
}
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
// todo
static int32_t buildAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, SVAlterTbReq* pReq) {
pReq->tbName = strdup(pStmt->tableName);
if (NULL == pReq->tbName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->action = pStmt->alterType;
switch (pStmt->alterType) {
case TSDB_ALTER_TABLE_ADD_TAG:
case TSDB_ALTER_TABLE_DROP_TAG:
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:
pReq->tagName = strdup(pStmt->colName);
if (NULL == pReq->tagName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (DEAL_RES_ERROR == translateValue(pCxt, pStmt->pVal)) {
return pCxt->errCode;
}
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
pReq->nTagVal = pStmt->pVal->node.resType.bytes;
char* pVal = nodesGetValueFromNode(pStmt->pVal);
pReq->pTagVal = IS_VAR_DATA_TYPE(pStmt->pVal->node.resType.type) ? pVal + VARSTR_HEADER_SIZE : pVal;
break;
case TSDB_ALTER_TABLE_ADD_COLUMN:
case TSDB_ALTER_TABLE_DROP_COLUMN:
pReq->colName = strdup(pStmt->colName);
if (NULL == pReq->colName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->type = pStmt->dataType.type;
pReq->flags = COL_SMA_ON;
pReq->bytes = pStmt->dataType.bytes;
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
pReq->colName = strdup(pStmt->colName);
if (NULL == pReq->colName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->colModBytes = calcTypeBytes(pStmt->dataType);
break;
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
if (-1 != pStmt->pOptions->ttl) {
pReq->updateTTL = true;
pReq->newTTL = pStmt->pOptions->ttl;
}
if ('\0' != pStmt->pOptions->comment[0]) {
pReq->updateComment = true;
pReq->newComment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->newComment) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
pReq->colName = strdup(pStmt->colName);
pReq->colNewName = strdup(pStmt->newColName);
if (NULL == pReq->colName || NULL == pReq->colNewName) {
return TSDB_CODE_OUT_OF_MEMORY;
}
break;
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static int32_t serializeAlterTbReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, SVAlterTbReq* pReq,
SArray* pArray) {
SVgroupInfo vg = {0};
int32_t code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &vg);
int tlen = 0;
if (TSDB_CODE_SUCCESS == code) {
tEncodeSize(tEncodeSVAlterTbReq, pReq, tlen, code);
}
if (TSDB_CODE_SUCCESS == code) {
tlen += sizeof(SMsgHead);
void* pMsg = taosMemoryMalloc(tlen);
if (NULL == pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pMsg)->vgId = htonl(vg.vgId);
((SMsgHead*)pMsg)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
SEncoder coder = {0};
tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
tEncodeSVAlterTbReq(&coder, pReq);
tEncoderClear(&coder);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
taosMemoryFree(pMsg);
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgData->vg = vg;
pVgData->pData = pMsg;
pVgData->size = tlen;
pVgData->numOfTables = 1;
taosArrayPush(pArray, &pVgData);
}
return code;
}
static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* pStmt, SVAlterTbReq* pReq,
SArray** pArray) {
SArray* pTmpArray = taosArrayInit(1, sizeof(void*));
if (NULL == pTmpArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = serializeAlterTbReq(pCxt, pStmt, pReq, pTmpArray);
if (TSDB_CODE_SUCCESS == code) {
*pArray = pTmpArray;
} else {
taosArrayDestroy(pTmpArray);
}
return code;
}
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (TSDB_SUPER_TABLE == pTableMeta->tableType) {
return TSDB_CODE_SUCCESS;
} else if (TSDB_CHILD_TABLE != pTableMeta->tableType && TSDB_NORMAL_TABLE != pTableMeta->tableType) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
SVAlterTbReq req = {0};
code = buildAlterTbReq(pCxt, pStmt, &req);
SArray* pArray = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = buildModifyVnodeArray(pCxt, pStmt, &req, &pArray);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
}
return code;
}
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pQuery->pRoot)) {
......@@ -4296,9 +4444,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
code = rewriteDropTable(pCxt, pQuery);
break;
case QUERY_NODE_ALTER_TABLE_STMT:
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == ((SAlterTableStmt*)pQuery->pRoot)->alterType) {
code = rewriteAlterTable(pCxt, pQuery);
}
code = rewriteAlterTable(pCxt, pQuery);
break;
default:
break;
......
......@@ -152,6 +152,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Invalid timeline function";
case TSDB_CODE_PAR_INVALID_PASSWD:
return "Invalid password";
case TSDB_CODE_PAR_INVALID_ALTER_TABLE:
return "Invalid alter table statement";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
......
......@@ -3500,7 +3500,7 @@ static YYACTIONTYPE yy_reduce(
yymsp[-4].minor.yy172 = yylhsminor.yy172;
break;
case 121: /* alter_table_clause ::= full_table_name SET TAG column_name NK_EQ literal */
{ yylhsminor.yy172 = createAlterTableSetTag(pCxt, yymsp[-5].minor.yy172, &yymsp[-2].minor.yy105, yymsp[0].minor.yy172); }
{ yylhsminor.yy172 = createAlterTableSetTag(pCxt, yymsp[-5].minor.yy172, &yymsp[-2].minor.yy105, releaseRawExprNode(pCxt, yymsp[0].minor.yy172)); }
yymsp[-5].minor.yy172 = yylhsminor.yy172;
break;
case 123: /* multi_create_clause ::= multi_create_clause create_subtable_clause */
......
......@@ -69,7 +69,7 @@ TEST_F(ParserInitialATest, alterDatabase) {
* | COMMENT 'string_value'
* }
*/
TEST_F(ParserInitialATest, alterTable) {
TEST_F(ParserInitialATest, alterSTable) {
useDb("root", "test");
SMAlterStbReq expect = {0};
......@@ -119,7 +119,7 @@ TEST_F(ParserInitialATest, alterTable) {
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_TABLE_STMT);
SMAlterStbReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.alterType, expect.alterType);
ASSERT_EQ(req.numOfFields, expect.numOfFields);
......@@ -139,24 +139,24 @@ TEST_F(ParserInitialATest, alterTable) {
}
});
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, nullptr, 10);
run("ALTER TABLE t1 TTL 10");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, nullptr, 10);
run("ALTER TABLE st1 TTL 10");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, "test");
run("ALTER TABLE t1 COMMENT 'test'");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, "test");
run("ALTER TABLE st1 COMMENT 'test'");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, 1, "cc1", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE t1 ADD COLUMN cc1 BIGINT");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_COLUMN, 1, "cc1", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE st1 ADD COLUMN cc1 BIGINT");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_DROP_COLUMN, 1, "c1");
run("ALTER TABLE t1 DROP COLUMN c1");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_DROP_COLUMN, 1, "c1");
run("ALTER TABLE st1 DROP COLUMN c1");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c1", TSDB_DATA_TYPE_VARCHAR,
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c1", TSDB_DATA_TYPE_VARCHAR,
20 + VARSTR_HEADER_SIZE);
run("ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(20)");
run("ALTER TABLE st1 MODIFY COLUMN c1 VARCHAR(20)");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1");
run("ALTER TABLE t1 RENAME COLUMN c1 cc1");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1");
run("ALTER TABLE st1 RENAME COLUMN c1 cc1");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_TAG, 1, "tag11", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE st1 ADD TAG tag11 BIGINT");
......@@ -171,7 +171,127 @@ TEST_F(ParserInitialATest, alterTable) {
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_NAME, 2, "tag1", 0, 0, "tag11");
run("ALTER TABLE st1 RENAME TAG tag1 tag11");
// run("ALTER TABLE st1s1 SET TAG tag1=10");
// todo
// ADD {FULLTEXT | SMA} INDEX index_name (col_name [, col_name] ...) [index_option]
}
TEST_F(ParserInitialATest, alterTable) {
useDb("root", "test");
SVAlterTbReq expect = {0};
auto setAlterColFunc = [&](const char* pTbname, int8_t alterType, const char* pColName, int8_t dataType = 0,
int32_t dataBytes = 0, const char* pNewColName = nullptr) {
memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname);
expect.action = alterType;
expect.colName = strdup(pColName);
switch (alterType) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
expect.type = dataType;
expect.flags = COL_SMA_ON;
expect.bytes = dataBytes > 0 ? dataBytes : (dataType > 0 ? tDataTypes[dataType].bytes : 0);
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
expect.colModBytes = dataBytes;
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
expect.colNewName = strdup(pNewColName);
break;
default:
break;
}
};
auto setAlterTagFunc = [&](const char* pTbname, const char* pTagName, const uint8_t* pNewVal, uint32_t bytes) {
memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_TAG_VAL;
expect.tagName = strdup(pTagName);
expect.isNull = (nullptr == pNewVal);
expect.nTagVal = bytes;
expect.pTagVal = pNewVal;
};
auto setAlterOptionsFunc = [&](const char* pTbname, int32_t ttl, const char* pComment = nullptr) {
memset(&expect, 0, sizeof(SVAlterTbReq));
expect.tbName = strdup(pTbname);
expect.action = TSDB_ALTER_TABLE_UPDATE_OPTIONS;
if (-1 != ttl) {
expect.updateTTL = true;
expect.newTTL = ttl;
}
if (nullptr != pComment) {
expect.updateComment = true;
expect.newComment = pComment;
}
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_VNODE_MODIF_STMT);
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
ASSERT_EQ(pStmt->sqlNodeType, QUERY_NODE_ALTER_TABLE_STMT);
ASSERT_NE(pStmt->pDataBlocks, nullptr);
ASSERT_EQ(taosArrayGetSize(pStmt->pDataBlocks), 1);
SVgDataBlocks* pVgData = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, 0);
void* pBuf = POINTER_SHIFT(pVgData->pData, sizeof(SMsgHead));
SVAlterTbReq req = {0};
SDecoder coder = {0};
tDecoderInit(&coder, (const uint8_t*)pBuf, pVgData->size);
ASSERT_EQ(tDecodeSVAlterTbReq(&coder, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(std::string(req.tbName), std::string(expect.tbName));
ASSERT_EQ(req.action, expect.action);
if (nullptr != expect.colName) {
ASSERT_EQ(std::string(req.colName), std::string(expect.colName));
}
ASSERT_EQ(req.type, expect.type);
ASSERT_EQ(req.flags, expect.flags);
ASSERT_EQ(req.bytes, expect.bytes);
ASSERT_EQ(req.colModBytes, expect.colModBytes);
if (nullptr != expect.colNewName) {
ASSERT_EQ(std::string(req.colNewName), std::string(expect.colNewName));
}
if (nullptr != expect.tagName) {
ASSERT_EQ(std::string(req.tagName), std::string(expect.tagName));
}
ASSERT_EQ(req.isNull, expect.isNull);
ASSERT_EQ(req.nTagVal, expect.nTagVal);
ASSERT_EQ(memcmp(req.pTagVal, expect.pTagVal, expect.nTagVal), 0);
ASSERT_EQ(req.updateTTL, expect.updateTTL);
ASSERT_EQ(req.newTTL, expect.newTTL);
ASSERT_EQ(req.updateComment, expect.updateComment);
if (nullptr != expect.newComment) {
ASSERT_EQ(std::string(req.newComment), std::string(expect.newComment));
}
tDecoderClear(&coder);
});
setAlterOptionsFunc("t1", 10, nullptr);
run("ALTER TABLE t1 TTL 10");
setAlterOptionsFunc("t1", -1, "test");
run("ALTER TABLE t1 COMMENT 'test'");
setAlterColFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, "cc1", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE t1 ADD COLUMN cc1 BIGINT");
setAlterColFunc("t1", TSDB_ALTER_TABLE_DROP_COLUMN, "c1");
run("ALTER TABLE t1 DROP COLUMN c1");
setAlterColFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, "c1", TSDB_DATA_TYPE_VARCHAR, 20 + VARSTR_HEADER_SIZE);
run("ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(20)");
setAlterColFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, "c1", 0, 0, "cc1");
run("ALTER TABLE t1 RENAME COLUMN c1 cc1");
int64_t val = 10;
setAlterTagFunc("st1s1", "tag1", (const uint8_t*)&val, sizeof(val));
run("ALTER TABLE st1s1 SET TAG tag1=10");
// todo
// ADD {FULLTEXT | SMA} INDEX index_name (col_name [, col_name] ...) [index_option]
......
......@@ -985,6 +985,8 @@ static int32_t getMsgType(ENodeType sqlType) {
return TDMT_VND_CREATE_TABLE;
case QUERY_NODE_DROP_TABLE_STMT:
return TDMT_VND_DROP_TABLE;
case QUERY_NODE_ALTER_TABLE_STMT:
return TDMT_VND_ALTER_TABLE;
default:
break;
}
......
......@@ -256,6 +256,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
return TSDB_CODE_SUCCESS;
case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_DROP_TABLE_RSP:
case TDMT_VND_ALTER_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP:
break;
default:
......@@ -1131,6 +1132,24 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
case TDMT_VND_ALTER_TABLE_RSP: {
SVAlterTbRsp rsp = {0};
if (msg) {
SDecoder coder = {0};
tDecoderInit(&coder, msg, msgSize);
code = tDecodeSVAlterTbRsp(&coder, &rsp);
tDecoderClear(&coder);
SCH_ERR_JRET(code);
SCH_ERR_JRET(rsp.code);
}
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
break;
}
case TDMT_VND_SUBMIT_RSP: {
SCH_ERR_JRET(rspCode);
......@@ -1391,6 +1410,10 @@ int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t co
return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}
int32_t schHandleAlterTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
......@@ -1490,6 +1513,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_DROP_TABLE:
*fp = schHandleDropTableCallback;
break;
case TDMT_VND_ALTER_TABLE:
*fp = schHandleAlterTableCallback;
break;
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
......@@ -2010,6 +2036,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_DROP_TABLE:
case TDMT_VND_ALTER_TABLE:
case TDMT_VND_SUBMIT: {
msgSize = pTask->msgLen;
msg = taosMemoryCalloc(1, msgSize);
......
......@@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
sleep 2000
sleep 1000
sql connect
print ======== step1 udf
......
......@@ -220,7 +220,6 @@ if $data[0][4] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][3]
elif $data[0][6] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][5]
endi
elif $data[0][8] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][7]
else
......@@ -342,7 +341,6 @@ elif $data[0][6] == LEADER then
goto check_vg_ready_3
endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][7]
endi
elif $data[0][8] == LEADER then
if $data[0][4] == LEADER then
goto check_vg_ready_3
......
......@@ -420,7 +420,6 @@ elif $data[0][6] == LEADER then
goto check_vg_ready_3
endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][7]
endi
elif $data[0][8] == LEADER then
if $data[0][4] == LEADER then
goto check_vg_ready_3
......
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$cdb_index = 0
#=============================== start consume =============================#
print ================ test consume from stb
$loop_cnt = 0
loop_consume_diff_topic_from_stb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
print ================ test consume from ctb
$loop_cnt = 0
loop_consume_diff_topic_from_ctb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ctb_column
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
print ================ test consume from ntb
$loop_cnt = 0
loop_consume_diff_topic_from_ntb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
$cdb_index = 0
#=============================== start consume =============================#
print ================ test consume from stb
$loop_cnt = 0
loop_consume_diff_topic_from_stb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
print ================ test consume from ctb
$loop_cnt = 0
loop_consume_diff_topic_from_ctb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ctb_column
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
print ================ test consume from ntb
$loop_cnt = 0
loop_consume_diff_topic_from_ntb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$topicNum = 3
#=============================== start consume =============================#
print ================ test consume from stb
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 3
#=============================== start consume =============================#
print ================ test consume from stb
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
$pullDelay = 5
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
$showRow = 0
......@@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
# return -1
#endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 2
......@@ -72,7 +81,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_stb_all
......@@ -80,7 +89,7 @@ $topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
......@@ -158,7 +167,7 @@ sleep 500
sql use $cdbName
print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
......@@ -179,14 +188,14 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ctb_function
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
......@@ -249,7 +258,7 @@ sleep 500
sql use $cdbName
print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
......@@ -270,7 +279,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ntb_function
......@@ -278,7 +287,7 @@ $topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . '
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
......
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-4vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 4
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$cdb_index = 0
#=============================== start consume =============================#
print ================ test consume from stb
$loop_cnt = 0
loop_consume_diff_topic_from_stb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
print ================ test consume from ctb
$loop_cnt = 0
loop_consume_diff_topic_from_ctb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ctb_column
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
print ================ test consume from ntb
$loop_cnt = 0
loop_consume_diff_topic_from_ntb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-4vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 4
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
$cdb_index = 0
#=============================== start consume =============================#
print ================ test consume from stb
$loop_cnt = 0
loop_consume_diff_topic_from_stb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
print ================ test consume from ctb
$loop_cnt = 0
loop_consume_diff_topic_from_ctb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ctb_column
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
print ================ test consume from ntb
$loop_cnt = 0
loop_consume_diff_topic_from_ntb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-4vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 4
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$topicNum = 3
print ================ test consume from stb
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-4vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 4
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 3
$ifcheckdata = 1
$ifmanualcommit = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
$keyList = ' . group.id:cgrp1
$keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
$topicNum = 3
print ================ test consume from stb
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册