提交 0b981808 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -3355,7 +3355,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
break;
}
if (status == PROJECT_RETRIEVE_CONTINUE) {
if (status == PROJECT_RETRIEVE_CONTINUE || pInfo->pRes->info.rows == 0) {
continue;
} else if (status == PROJECT_RETRIEVE_DONE) {
break;
......
......@@ -369,6 +369,8 @@ static void destroyPhysiNode(SPhysiNode* pNode) {
nodesDestroyList(pNode->pChildren);
nodesDestroyNode(pNode->pConditions);
nodesDestroyNode((SNode*)pNode->pOutputDataBlockDesc);
nodesDestroyNode(pNode->pLimit);
nodesDestroyNode(pNode->pSlimit);
}
static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
......@@ -389,11 +391,16 @@ static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode*
static void destroyExprNode(SExprNode* pExpr) { taosArrayDestroy(pExpr->pAssociation); }
static void nodesDestroyNodePointer(void* node) {
SNode* pNode = *(SNode**)node;
nodesDestroyNode(pNode);
static void destroyTableCfg(STableCfg* pCfg) {
taosArrayDestroy(pCfg->pFuncs);
taosMemoryFree(pCfg->pComment);
taosMemoryFree(pCfg->pSchemas);
taosMemoryFree(pCfg->pTags);
taosMemoryFree(pCfg);
}
static void destroySmaIndex(void* pIndex) { taosMemoryFree(((STableIndexInfo*)pIndex)->expr); }
void nodesDestroyNode(SNode* pNode) {
if (NULL == pNode) {
return;
......@@ -431,6 +438,7 @@ void nodesDestroyNode(SNode* pNode) {
SRealTableNode* pReal = (SRealTableNode*)pNode;
taosMemoryFreeClear(pReal->pMeta);
taosMemoryFreeClear(pReal->pVgroupList);
taosArrayDestroyEx(pReal->pSmaIndexes, destroySmaIndex);
break;
}
case QUERY_NODE_TEMP_TABLE:
......@@ -451,9 +459,12 @@ void nodesDestroyNode(SNode* pNode) {
break;
case QUERY_NODE_LIMIT: // no pointer field
break;
case QUERY_NODE_STATE_WINDOW:
nodesDestroyNode(((SStateWindowNode*)pNode)->pExpr);
case QUERY_NODE_STATE_WINDOW: {
SStateWindowNode* pState = (SStateWindowNode*)pNode;
nodesDestroyNode(pState->pCol);
nodesDestroyNode(pState->pExpr);
break;
}
case QUERY_NODE_SESSION_WINDOW: {
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
nodesDestroyNode((SNode*)pSession->pCol);
......@@ -500,8 +511,10 @@ void nodesDestroyNode(SNode* pNode) {
}
case QUERY_NODE_TABLE_OPTIONS: {
STableOptions* pOptions = (STableOptions*)pNode;
nodesDestroyList(pOptions->pSma);
nodesDestroyList(pOptions->pMaxDelay);
nodesDestroyList(pOptions->pWatermark);
nodesDestroyList(pOptions->pRollupFuncs);
nodesDestroyList(pOptions->pSma);
break;
}
case QUERY_NODE_INDEX_OPTIONS: {
......@@ -510,17 +523,22 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pOptions->pInterval);
nodesDestroyNode(pOptions->pOffset);
nodesDestroyNode(pOptions->pSliding);
nodesDestroyNode(pOptions->pStreamOptions);
break;
}
case QUERY_NODE_EXPLAIN_OPTIONS: // no pointer field
break;
case QUERY_NODE_STREAM_OPTIONS:
nodesDestroyNode(((SStreamOptions*)pNode)->pWatermark);
case QUERY_NODE_STREAM_OPTIONS: {
SStreamOptions* pOptions = (SStreamOptions*)pNode;
nodesDestroyNode(pOptions->pDelay);
nodesDestroyNode(pOptions->pWatermark);
break;
}
case QUERY_NODE_LEFT_VALUE: // no pointer field
break;
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pStmt = (SSetOperator*)pNode;
nodesDestroyList(pStmt->pProjectionList);
nodesDestroyNode(pStmt->pLeft);
nodesDestroyNode(pStmt->pRight);
nodesDestroyList(pStmt->pOrderByList);
......@@ -582,7 +600,8 @@ void nodesDestroyNode(SNode* pNode) {
break;
case QUERY_NODE_DROP_SUPER_TABLE_STMT: // no pointer field
break;
case QUERY_NODE_ALTER_TABLE_STMT: {
case QUERY_NODE_ALTER_TABLE_STMT:
case QUERY_NODE_ALTER_SUPER_TABLE_STMT: {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pNode;
nodesDestroyNode((SNode*)pStmt->pOptions);
nodesDestroyNode((SNode*)pStmt->pVal);
......@@ -686,14 +705,15 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pTbName);
break;
}
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT: // no pointer field
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pDnodeId);
break;
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
taosMemoryFreeClear(((SShowCreateDatabaseStmt*)pNode)->pCfg);
break;
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
taosMemoryFreeClear(((SShowCreateTableStmt*)pNode)->pCfg);
destroyTableCfg((STableCfg*)(((SShowCreateTableStmt*)pNode)->pCfg));
break;
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: // no pointer field
case QUERY_NODE_KILL_CONNECTION_STMT: // no pointer field
......@@ -725,7 +745,8 @@ void nodesDestroyNode(SNode* pNode) {
}
taosArrayDestroy(pQuery->pDbList);
taosArrayDestroy(pQuery->pTableList);
taosArrayDestroyEx(pQuery->pPlaceholderValues, nodesDestroyNodePointer);
taosArrayDestroy(pQuery->pPlaceholderValues);
nodesDestroyNode(pQuery->pPrepareRoot);
break;
}
case QUERY_NODE_LOGIC_PLAN_SCAN: {
......@@ -737,7 +758,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pLogicNode->pDynamicScanFuncs);
nodesDestroyNode(pLogicNode->pTagCond);
nodesDestroyNode(pLogicNode->pTagIndexCond);
taosArrayDestroy(pLogicNode->pSmaIndexes);
taosArrayDestroyEx(pLogicNode->pSmaIndexes, destroySmaIndex);
nodesDestroyList(pLogicNode->pGroupTags);
break;
}
......@@ -766,6 +787,9 @@ void nodesDestroyNode(SNode* pNode) {
destroyLogicNode((SLogicNode*)pLogicNode);
destroyVgDataBlockArray(pLogicNode->pDataBlocks);
// pVgDataBlocks is weak reference
nodesDestroyNode(pLogicNode->pAffectedRows);
taosMemoryFreeClear(pLogicNode->pVgroupList);
nodesDestroyList(pLogicNode->pInsertCols);
break;
}
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
......@@ -784,6 +808,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pLogicNode->pFuncs);
nodesDestroyNode(pLogicNode->pTspk);
nodesDestroyNode(pLogicNode->pTsEnd);
nodesDestroyNode(pLogicNode->pStateExpr);
break;
}
case QUERY_NODE_LOGIC_PLAN_FILL: {
......@@ -833,9 +858,14 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
destroyScanPhysiNode((SScanPhysiNode*)pNode);
nodesDestroyList(pPhyNode->pGroupTags);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
......
......@@ -462,7 +462,7 @@ explain_options(A) ::= explain_options(B) VERBOSE NK_BOOL(C).
explain_options(A) ::= explain_options(B) RATIO NK_FLOAT(C). { A = setExplainRatio(pCxt, B, &C); }
/************************************************ compact *************************************************************/
cmd ::= COMPACT VNODES IN NK_LP integer_list(A) NK_RP. { pCxt->pRootNode = createCompactStmt(pCxt, A); }
cmd ::= COMPACT VNODES IN NK_LP integer_list NK_RP. { pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); }
/************************************************ create/drop function ************************************************/
cmd ::= CREATE agg_func_opt(A) FUNCTION not_exists_opt(F) function_name(B)
......
......@@ -387,6 +387,19 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ
return (SNode*)cond;
}
static uint8_t getMinusDataType(uint8_t orgType) {
switch (orgType) {
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
return TSDB_DATA_TYPE_BIGINT;
default:
break;
}
return orgType;
}
SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight) {
CHECK_PARSER_STATUS(pCxt);
if (OP_TYPE_MINUS == type && QUERY_NODE_VALUE == nodeType(pLeft)) {
......@@ -402,7 +415,7 @@ SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pL
}
taosMemoryFree(pVal->literal);
pVal->literal = pNewLiteral;
pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
pVal->node.resType.type = getMinusDataType(pVal->node.resType.type);
return pLeft;
}
SOperatorNode* op = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
......
......@@ -1257,6 +1257,7 @@ static int32_t rewriteFuncToValue(STranslateContext* pCxt, char* pLiteral, SNode
}
}
if (DEAL_RES_ERROR != translateValue(pCxt, pVal)) {
nodesDestroyNode(*pNode);
*pNode = (SNode*)pVal;
} else {
nodesDestroyNode((SNode*)pVal);
......@@ -4009,30 +4010,7 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
return NULL;
}
static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Set tag value only available for child table");
}
if (pStmt->alterType == TSDB_ALTER_TABLE_UPDATE_OPTIONS && -1 != pStmt->pOptions->ttl) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
if (pStmt->dataType.type == TSDB_DATA_TYPE_JSON && pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
}
if (pStmt->dataType.type == TSDB_DATA_TYPE_JSON && pStmt->alterType == TSDB_ALTER_TABLE_ADD_COLUMN) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON);
}
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
static int32_t checkAlterSuperTableImpl(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
if (getNumOfTags(pTableMeta) == 1 && pTagsSchema->type == TSDB_DATA_TYPE_JSON &&
(pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG || pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG ||
......@@ -4057,6 +4035,33 @@ static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pS
return TSDB_CODE_SUCCESS;
}
static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Set tag value only available for child table");
}
if (pStmt->alterType == TSDB_ALTER_TABLE_UPDATE_OPTIONS && -1 != pStmt->pOptions->ttl) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
if (pStmt->dataType.type == TSDB_DATA_TYPE_JSON && pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
}
if (pStmt->dataType.type == TSDB_DATA_TYPE_JSON && pStmt->alterType == TSDB_ALTER_TABLE_ADD_COLUMN) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON);
}
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = checkAlterSuperTableImpl(pCxt, pStmt, pTableMeta);
}
taosMemoryFree(pTableMeta);
return code;
}
static int32_t translateAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
SMAlterStbReq alterReq = {0};
int32_t code = checkAlterSuperTable(pCxt, pStmt);
......@@ -6438,6 +6443,7 @@ static int32_t toMsgType(ENodeType type) {
static int32_t setRefreshMate(STranslateContext* pCxt, SQuery* pQuery) {
if (NULL != pCxt->pDbs) {
taosArrayDestroy(pQuery->pDbList);
pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN);
if (NULL == pQuery->pDbList) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -6450,6 +6456,7 @@ static int32_t setRefreshMate(STranslateContext* pCxt, SQuery* pQuery) {
}
if (NULL != pCxt->pTables) {
taosArrayDestroy(pQuery->pTableList);
pQuery->pTableList = taosArrayInit(taosHashGetSize(pCxt->pTables), sizeof(SName));
if (NULL == pQuery->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -6521,6 +6528,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->stableQuery = pCxt->stableQuery;
if (pQuery->haveResultSet) {
taosMemoryFreeClear(pQuery->pResSchema);
if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -865,12 +865,15 @@ STableCfg* tableCfgDup(STableCfg* pCfg) {
STableCfg* pNew = taosMemoryMalloc(sizeof(*pNew));
memcpy(pNew, pCfg, sizeof(*pNew));
if (pNew->pComment) {
if (NULL != pNew->pComment) {
pNew->pComment = strdup(pNew->pComment);
}
if (pNew->pFuncs) {
if (NULL != pNew->pFuncs) {
pNew->pFuncs = taosArrayDup(pNew->pFuncs);
}
if (NULL != pNew->pTags) {
pNew->pTags = strdup(pNew->pTags);
}
int32_t schemaSize = (pCfg->numOfColumns + pCfg->numOfTags) * sizeof(SSchema);
......
......@@ -239,6 +239,7 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx
}
if (TSDB_CODE_SUCCESS == code && (colIdx < 0 || colIdx + 1 == pQuery->placeholderNum)) {
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = nodesCloneNode(pQuery->pPrepareRoot);
if (NULL == pQuery->pRoot) {
code = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -4117,7 +4117,8 @@ static YYACTIONTYPE yy_reduce(
yymsp[-2].minor.yy616 = yylhsminor.yy616;
break;
case 254: /* cmd ::= COMPACT VNODES IN NK_LP integer_list NK_RP */
{ pCxt->pRootNode = createCompactStmt(pCxt, yymsp[-1].minor.yy356); }
{ pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_EXPRIE_STATEMENT); }
yy_destructor(yypParser,273,&yymsp[-1].minor);
break;
case 255: /* cmd ::= CREATE agg_func_opt FUNCTION not_exists_opt function_name AS NK_STRING OUTPUTTYPE type_name bufsize_opt */
{ pCxt->pRootNode = createCreateFunctionStmt(pCxt, yymsp[-6].minor.yy151, yymsp[-8].minor.yy151, &yymsp[-5].minor.yy361, &yymsp[-3].minor.yy0, yymsp[-1].minor.yy600, yymsp[0].minor.yy734); }
......
......@@ -93,6 +93,17 @@ class MockCatalogServiceImpl {
MockCatalogServiceImpl() : id_(1) {}
~MockCatalogServiceImpl() {
for (auto& cfg : dbCfg_) {
taosArrayDestroy(cfg.second.pRetensions);
}
for (auto& indexes : index_) {
for (auto& index : indexes.second) {
taosMemoryFree(index.expr);
}
}
}
int32_t catalogGetHandle() const { return 0; }
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
......@@ -676,6 +687,7 @@ void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) {
taosArrayDestroy(pReq->pIndex);
taosArrayDestroy(pReq->pUser);
taosArrayDestroy(pReq->pTableIndex);
taosArrayDestroy(pReq->pTableCfg);
delete pReq;
}
......@@ -684,6 +696,11 @@ void MockCatalogService::destoryMetaRes(void* p) {
taosMemoryFree(pRes->pRes);
}
void MockCatalogService::destoryMetaArrayRes(void* p) {
SMetaRes* pRes = (SMetaRes*)p;
taosArrayDestroy((SArray*)pRes->pRes);
}
void MockCatalogService::destoryMetaData(SMetaData* pData) {
taosArrayDestroyEx(pData->pDbVgroup, destoryMetaRes);
taosArrayDestroyEx(pData->pDbCfg, destoryMetaRes);
......@@ -695,5 +712,8 @@ void MockCatalogService::destoryMetaData(SMetaData* pData) {
taosArrayDestroyEx(pData->pIndex, destoryMetaRes);
taosArrayDestroyEx(pData->pUser, destoryMetaRes);
taosArrayDestroyEx(pData->pQnodeList, destoryMetaRes);
taosArrayDestroyEx(pData->pTableCfg, destoryMetaRes);
taosArrayDestroyEx(pData->pDnodeList, destoryMetaArrayRes);
taosMemoryFree(pData->pSvrVer);
delete pData;
}
......@@ -52,6 +52,7 @@ class MockCatalogService {
public:
static void destoryCatalogReq(SCatalogReq* pReq);
static void destoryMetaRes(void* p);
static void destoryMetaArrayRes(void* p);
static void destoryMetaData(SMetaData* pData);
MockCatalogService();
......
......@@ -21,7 +21,11 @@ namespace ParserTest {
class ParserInitialCTest : public ParserDdlTest {};
// todo compact
TEST_F(ParserInitialCTest, compact) {
useDb("root", "test");
run("COMPACT VNODES IN (1, 2)", TSDB_CODE_PAR_EXPRIE_STATEMENT, PARSER_STAGE_PARSE);
}
TEST_F(ParserInitialCTest, createAccount) {
useDb("root", "test");
......@@ -32,6 +36,19 @@ TEST_F(ParserInitialCTest, createAccount) {
TEST_F(ParserInitialCTest, createBnode) {
useDb("root", "test");
SMCreateQnodeReq expect = {0};
auto setCreateQnodeReq = [&](int32_t dnodeId) { expect.dnodeId = dnodeId; };
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_BNODE_STMT);
SMCreateQnodeReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS ==
tDeserializeSCreateDropMQSBNodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(req.dnodeId, expect.dnodeId);
});
setCreateQnodeReq(1);
run("CREATE BNODE ON DNODE 1");
}
......
......@@ -123,6 +123,14 @@ class ParserTestBaseImpl {
delete pMetaCache;
}
static void _destroyQuery(SQuery** pQuery) {
if (nullptr == pQuery) {
return;
}
qDestroyQuery(*pQuery);
taosMemoryFree(pQuery);
}
bool checkResultCode(const string& pFunc, int32_t resultCode) {
return !(stmtEnv_.checkFunc_.empty())
? ((stmtEnv_.checkFunc_ == pFunc) ? stmtEnv_.expect_ == resultCode : TSDB_CODE_SUCCESS == resultCode)
......@@ -278,9 +286,9 @@ class ParserTestBaseImpl {
SParseContext cxt = {0};
setParseContext(sql, &cxt);
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
unique_ptr<SQuery, void (*)(SQuery*)> query(pQuery, qDestroyQuery);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
doAuthenticate(&cxt, pQuery, nullptr);
......@@ -306,9 +314,9 @@ class ParserTestBaseImpl {
SParseContext cxt = {0};
setParseContext(sql, &cxt);
SQuery* pQuery = nullptr;
doParseSql(&cxt, &pQuery);
unique_ptr<SQuery, void (*)(SQuery*)> query(pQuery, qDestroyQuery);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParseSql(&cxt, query.get());
SQuery* pQuery = *(query.get());
if (g_dump) {
dump();
......@@ -328,9 +336,9 @@ class ParserTestBaseImpl {
SParseContext cxt = {0};
setParseContext(sql, &cxt, true);
SQuery* pQuery = nullptr;
doParse(&cxt, &pQuery);
unique_ptr<SQuery, void (*)(SQuery*)> query(pQuery, qDestroyQuery);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParse(&cxt, query.get());
SQuery* pQuery = *(query.get());
unique_ptr<SParseMetaCache, void (*)(SParseMetaCache*)> metaCache(new SParseMetaCache(), _destoryParseMetaCache);
doCollectMetaKey(&cxt, pQuery, metaCache.get());
......@@ -386,9 +394,9 @@ class ParserTestBaseImpl {
unique_ptr<SCatalogReq, void (*)(SCatalogReq*)> catalogReq(new SCatalogReq(),
MockCatalogService::destoryCatalogReq);
SQuery* pQuery = nullptr;
doParseSqlSyntax(&cxt, &pQuery, catalogReq.get());
unique_ptr<SQuery, void (*)(SQuery*)> query(pQuery, qDestroyQuery);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParseSqlSyntax(&cxt, query.get(), catalogReq.get());
SQuery* pQuery = *(query.get());
string err;
thread t1([&]() {
......
......@@ -1068,7 +1068,11 @@ static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNo
}
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SWindowLogicNode* pWindowLogicNode) {
pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark;
pWindow->igExpired = pWindowLogicNode->igExpired;
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
int32_t code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
......@@ -1100,16 +1104,6 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
code = setConditionsSlotId(pCxt, (const SLogicNode*)pWindowLogicNode, (SPhysiNode*)pWindow);
}
pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark;
pWindow->igExpired = pWindowLogicNode->igExpired;
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pWindow;
} else {
nodesDestroyNode((SNode*)pWindow);
}
nodesDestroyList(pPrecalcExprs);
nodesDestroyList(pFuncs);
......@@ -1156,7 +1150,14 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
pInterval->slidingUnit = pWindowLogicNode->slidingUnit;
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode);
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pInterval;
} else {
nodesDestroyNode((SNode*)pInterval);
}
return code;
}
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
......@@ -1169,7 +1170,14 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
pSession->gap = pWindowLogicNode->sessionGap;
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode);
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pSession;
} else {
nodesDestroyNode((SNode*)pSession);
}
return code;
}
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
......@@ -1201,12 +1209,20 @@ static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
}
}
if (TSDB_CODE_SUCCESS != code) {
if (TSDB_CODE_SUCCESS == code) {
code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pState;
} else {
nodesDestroyNode((SNode*)pState);
return code;
}
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
nodesDestroyList(pPrecalcExprs);
nodesDestroyNode(pStateKey);
return code;
}
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
......
......@@ -867,10 +867,11 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
}
if (TSDB_CODE_SUCCESS == code && groupSort) {
stbSplSetScanPartSort(pPartSort);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pInfo->pSplitNode);
if (groupSort) {
stbSplSetScanPartSort(pPartSort);
}
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
}
......
......@@ -24,6 +24,16 @@ class PlanStmtTest : public PlannerTestBase {
return (TAOS_MULTI_BIND*)taosMemoryCalloc(nParams, sizeof(TAOS_MULTI_BIND));
}
void destoryBindParams(TAOS_MULTI_BIND* pParams, int32_t nParams) {
for (int32_t i = 0; i < nParams; ++i) {
TAOS_MULTI_BIND* pParam = pParams + i;
taosMemoryFree(pParam->buffer);
taosMemoryFree(pParam->length);
taosMemoryFree(pParam->is_null);
}
taosMemoryFree(pParams);
}
TAOS_MULTI_BIND* buildIntegerParam(TAOS_MULTI_BIND* pBindParams, int32_t index, int64_t val, int32_t type) {
TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, 0);
......@@ -127,8 +137,10 @@ TEST_F(PlanStmtTest, basic) {
useDb("root", "test");
prepare("SELECT * FROM t1 WHERE c1 = ?");
bindParams(buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT), 0);
TAOS_MULTI_BIND* pBindParams = buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT);
bindParams(pBindParams, 0);
exec();
destoryBindParams(pBindParams, 1);
{
prepare("SELECT * FROM t1 WHERE c1 = ? AND c2 = ?");
......@@ -137,7 +149,7 @@ TEST_F(PlanStmtTest, basic) {
buildStringParam(pBindParams, 1, "abc", TSDB_DATA_TYPE_VARCHAR, strlen("abc"));
bindParams(pBindParams, -1);
exec();
taosMemoryFreeClear(pBindParams);
destoryBindParams(pBindParams, 2);
}
{
......@@ -147,7 +159,7 @@ TEST_F(PlanStmtTest, basic) {
buildIntegerParam(pBindParams, 1, 20, TSDB_DATA_TYPE_INT);
bindParams(pBindParams, -1);
exec();
taosMemoryFreeClear(pBindParams);
destoryBindParams(pBindParams, 2);
}
}
......@@ -155,12 +167,16 @@ TEST_F(PlanStmtTest, multiExec) {
useDb("root", "test");
prepare("SELECT * FROM t1 WHERE c1 = ?");
bindParams(buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT), 0);
TAOS_MULTI_BIND* pBindParams = buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT);
bindParams(pBindParams, 0);
exec();
bindParams(buildIntegerParam(createBindParams(1), 0, 20, TSDB_DATA_TYPE_INT), 0);
destoryBindParams(pBindParams, 1);
pBindParams = buildIntegerParam(createBindParams(1), 0, 20, TSDB_DATA_TYPE_INT);
bindParams(pBindParams, 0);
exec();
bindParams(buildIntegerParam(createBindParams(1), 0, 30, TSDB_DATA_TYPE_INT), 0);
destoryBindParams(pBindParams, 1);
pBindParams = buildIntegerParam(createBindParams(1), 0, 30, TSDB_DATA_TYPE_INT);
bindParams(pBindParams, 0);
exec();
destoryBindParams(pBindParams, 1);
}
TEST_F(PlanStmtTest, allDataType) { useDb("root", "test"); }
......@@ -126,9 +126,9 @@ class PlannerTestBaseImpl {
reset();
tsQueryPolicy = queryPolicy;
try {
SQuery* pQuery = nullptr;
doParseSql(sql, &pQuery);
unique_ptr<SQuery, void (*)(SQuery*)> query(pQuery, qDestroyQuery);
unique_ptr<SQuery*, void (*)(SQuery**)> query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery);
doParseSql(sql, query.get());
SQuery* pQuery = *(query.get());
SPlanContext cxt = {0};
setPlanContext(pQuery, &cxt);
......@@ -199,6 +199,8 @@ class PlannerTestBaseImpl {
SLogicSubplan* pLogicSubplan = nullptr;
doCreateLogicPlan(&cxt, &pLogicSubplan);
unique_ptr<SLogicSubplan, void (*)(SLogicSubplan*)> logicSubplan(pLogicSubplan,
(void (*)(SLogicSubplan*))nodesDestroyNode);
doOptimizeLogicPlan(&cxt, pLogicSubplan);
......@@ -206,9 +208,12 @@ class PlannerTestBaseImpl {
SQueryLogicPlan* pLogicPlan = nullptr;
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
unique_ptr<SQueryLogicPlan, void (*)(SQueryLogicPlan*)> logicPlan(pLogicPlan,
(void (*)(SQueryLogicPlan*))nodesDestroyNode);
SQueryPlan* pPlan = nullptr;
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
unique_ptr<SQueryPlan, void (*)(SQueryPlan*)> plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode);
dump(g_dumpModule);
} catch (...) {
......@@ -249,6 +254,14 @@ class PlannerTestBaseImpl {
vector<string> physiSubplans_;
};
static void _destroyQuery(SQuery** pQuery) {
if (nullptr == pQuery) {
return;
}
qDestroyQuery(*pQuery);
taosMemoryFree(pQuery);
}
void reset() {
stmtEnv_.sql_.clear();
stmtEnv_.msgBuf_.fill(0);
......@@ -400,20 +413,30 @@ class PlannerTestBaseImpl {
pCxt->queryId = 1;
pCxt->pUser = caseEnv_.user_.c_str();
if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
SCreateTopicStmt* pStmt = (SCreateTopicStmt*)pQuery->pRoot;
pCxt->pAstRoot = pStmt->pQuery;
pStmt->pQuery = nullptr;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = pCxt->pAstRoot;
pCxt->topicQuery = true;
} else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) {
SMCreateSmaReq req = {0};
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
g_mockCatalogService->createSmaIndex(&req);
nodesStringToNode(req.ast, &pCxt->pAstRoot);
tFreeSMCreateSmaReq(&req);
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = pCxt->pAstRoot;
pCxt->streamQuery = true;
} else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) {
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot;
pCxt->pAstRoot = pStmt->pQuery;
pStmt->pQuery = nullptr;
pCxt->streamQuery = true;
pCxt->triggerType = pStmt->pOptions->triggerType;
pCxt->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = pCxt->pAstRoot;
} else {
pCxt->pAstRoot = pQuery->pRoot;
}
......
......@@ -473,13 +473,13 @@ static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEn
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) {
sDebug("vgId:%d sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (myPreLogTerm == SYNC_TERM_INVALID) {
sDebug("vgId:%d sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
......@@ -487,7 +487,7 @@ static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEn
return true;
}
sDebug("vgId:%d sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
......@@ -500,13 +500,13 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) {
sDebug("vgId:%d sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d, sync log not ok, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (myPreLogTerm == SYNC_TERM_INVALID) {
sDebug("vgId:%d sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d, sync log not ok2, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
......@@ -514,7 +514,7 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
return true;
}
sDebug("vgId:%d sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d, sync log not ok3, preindex:%" PRId64, pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
......
......@@ -559,10 +559,11 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
(pEpSet->numOfEps)++;
sInfo("vgId:%d sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
pEpSet->eps[i].port);
}
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
sInfo("vgId:%d sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
......@@ -2996,7 +2997,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
"datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
syncNodeErrorLog(pSyncNode, logBuf);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
......@@ -3022,7 +3023,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
syncNodeErrorLog(pSyncNode, logBuf);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
......@@ -3046,5 +3047,5 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
"recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
syncNodeErrorLog(pSyncNode, logBuf);
syncNodeEventLog(pSyncNode, logBuf);
}
......@@ -108,10 +108,10 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pRoot = cJSON_CreateObject();
char u64Buf[128] = {0};
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->currentTerm);
snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->voteFor.addr);
snprintf(u64Buf, sizeof(u64Buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
......@@ -142,11 +142,11 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
ASSERT(cJSON_IsString(pCurrentTerm));
sscanf(pCurrentTerm->valuestring, "" PRIu64 "", &(pRaftStore->currentTerm));
sscanf(pCurrentTerm->valuestring, "%" PRIu64 "", &(pRaftStore->currentTerm));
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
ASSERT(cJSON_IsString(pVoteForAddr));
sscanf(pVoteForAddr->valuestring, "" PRIu64 "", &(pRaftStore->voteFor.addr));
sscanf(pVoteForAddr->valuestring, "%" PRIu64 "", &(pRaftStore->voteFor.addr));
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
......@@ -188,11 +188,11 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
cJSON *pRoot = cJSON_CreateObject();
if (pRaftStore != NULL) {
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->currentTerm);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
cJSON *pVoteFor = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->voteFor.addr);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
{
uint64_t u64 = pRaftStore->voteFor.addr;
......
#!/bin/bash
if [ $# != 1 ] ; then
echo "Uasge: $0 log-path"
echo ""
exit 1
fi
logpath=$1
echo "logpath: ${logpath}"
echo ""
echo "clean old log ..."
rm -f ${logpath}/log.*
echo ""
echo "generate log.dnode ..."
for dnode in `ls ${logpath} | grep dnode`;do
echo "generate log.${dnode}"
cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN > ${logpath}/log.${dnode}
done
echo ""
echo "generate vgId ..."
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | sort | uniq > ${logpath}/log.vgIds.tmp
echo "all vgIds:" > ${logpath}/log.vgIds
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | sort | uniq >> ${logpath}/log.vgIds
for dnode in `ls ${logpath} | grep dnode | grep -v log`;do
echo "" >> ${logpath}/log.vgIds
echo "" >> ${logpath}/log.vgIds
echo "${dnode}:" >> ${logpath}/log.vgIds
cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN | grep "vgId:" | grep -v ERROR | awk '{print $5}' | sort | uniq >> ${logpath}/log.vgIds
done
echo ""
echo "generate log.dnode.vgId ..."
for logdnode in `ls ${logpath}/log.dnode*`;do
for vgId in `cat ${logpath}/log.vgIds.tmp`;do
rowNum=`cat ${logdnode} | grep "${vgId}" | awk 'BEGIN{rowNum=0}{rowNum++}END{print rowNum}'`
#echo "-----${rowNum}"
if [ $rowNum -gt 0 ] ; then
echo "generate ${logdnode}.${vgId}"
cat ${logdnode} | grep "${vgId}" > ${logdnode}.${vgId}
fi
done
done
echo ""
echo "generate log.dnode.main ..."
for file in `ls ${logpath}/log.dnode* | grep -v vgId`;do
echo "generate ${file}.main"
cat ${file} | awk '{ if(index($0, "sync open") > 0 || index($0, "sync close") > 0 || index($0, "become leader") > 0) {print $0} }' > ${file}.main
done
exit 0
......@@ -11,13 +11,13 @@
# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import string
import requests
import time
import socket
import json
import toml
from .boundary import DataBoundary
import taos
from util.log import *
......@@ -25,6 +25,79 @@ from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.constant import *
from dataclasses import dataclass,field
from typing import List
@dataclass
class DataSet:
ts_data : List[int] = field(default_factory=list)
int_data : List[int] = field(default_factory=list)
bint_data : List[int] = field(default_factory=list)
sint_data : List[int] = field(default_factory=list)
tint_data : List[int] = field(default_factory=list)
uint_data : List[int] = field(default_factory=list)
ubint_data : List[int] = field(default_factory=list)
usint_data : List[int] = field(default_factory=list)
utint_data : List[int] = field(default_factory=list)
float_data : List[float] = field(default_factory=list)
double_data : List[float] = field(default_factory=list)
bool_data : List[int] = field(default_factory=list)
vchar_data : List[str] = field(default_factory=list)
nchar_data : List[str] = field(default_factory=list)
def get_order_set(self,
rows,
int_step :int = 1,
bint_step :int = 1,
sint_step :int = 1,
tint_step :int = 1,
uint_step :int = 1,
ubint_step :int = 1,
usint_step :int = 1,
utint_step :int = 1,
float_step :float = 1,
double_step :float = 1,
bool_start :int = 1,
vchar_prefix:str = "vachar_",
vchar_step :int = 1,
nchar_prefix:str = "nchar_测试_",
nchar_step :int = 1,
ts_step :int = 1
):
for i in range(rows):
self.int_data.append( int(i * int_step % INT_MAX ))
self.bint_data.append( int(i * bint_step % BIGINT_MAX ))
self.sint_data.append( int(i * sint_step % SMALLINT_MAX ))
self.tint_data.append( int(i * tint_step % TINYINT_MAX ))
self.uint_data.append( int(i * uint_step % INT_UN_MAX ))
self.ubint_data.append( int(i * ubint_step % BIGINT_UN_MAX ))
self.usint_data.append( int(i * usint_step % SMALLINT_UN_MAX ))
self.utint_data.append( int(i * utint_step % TINYINT_UN_MAX ))
self.float_data.append( float(i * float_step % FLOAT_MAX ))
self.double_data.append( float(i * double_step % DOUBLE_MAX ))
self.bool_data.append( bool((i + bool_start) % 2 ))
self.vchar_data.append( f"{vchar_prefix}_{i * vchar_step}" )
self.nchar_data.append( f"{nchar_prefix}_{i * nchar_step}")
self.ts_data.append( int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000 - i * ts_step))
def get_disorder_set(self,
rows,
int_low :int = INT_MIN,
int_up :int = INT_MAX,
bint_low :int = BIGINT_MIN,
bint_up :int = BIGINT_MAX,
sint_low :int = SMALLINT_MIN,
sint_up :int = SMALLINT_MAX,
tint_low :int = TINYINT_MIN,
tint_up :int = TINYINT_MAX,
ubint_low :int = BIGINT_UN_MIN,
ubint_up :int = BIGINT_UN_MAX,
):
pass
class TDCom:
def __init__(self):
......@@ -372,6 +445,7 @@ class TDCom:
def getClientCfgPath(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
......@@ -650,7 +724,7 @@ class TDCom:
else:
column_value_str += f'{column_value}, '
idx += 1
column_value_str = column_value_str.rstrip()[:-1]
column_value_str = column_value_str.rstrip()[:-1]
insert_sql = f'insert into {dbname}.{tbname} values ({column_value_str});'
tsql.execute(insert_sql)
def getOneRow(self, location, containElm):
......@@ -662,12 +736,12 @@ class TDCom:
return res_list
else:
tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}")
def killProcessor(self, processorName):
def killProcessor(self, processorName):
if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM %s.exe"%processorName)
else:
os.system('pkill %s'%processorName)
os.system('pkill %s'%processorName)
def is_json(msg):
......@@ -680,4 +754,29 @@ def is_json(msg):
else:
return False
def get_path(tool="taosd"):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if ((tool) in files or ("%s.exe"%tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
return ""
return paths[0]
def dict2toml(in_dict: dict, file:str):
if not isinstance(in_dict, dict):
return ""
with open(file, 'w') as f:
toml.dump(in_dict, f)
tdCom = TDCom()
......@@ -96,9 +96,9 @@ class TDSimClient:
for key, value in self.cfgDict.items():
self.cfg(key, value)
try:
if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
clientCfg = dict (updatecfgDict[0][0].get('clientCfg'))
for key, value in clientCfg.items():
self.cfg(key, value)
......@@ -244,7 +244,6 @@ class TDDnode:
# print(updatecfgDict)
isFirstDir = 1
if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
print(updatecfgDict[0][0])
for key, value in updatecfgDict[0][0].items():
if key == "clientCfg" and self.remoteIP == "" and not platform.system().lower() == 'windows':
continue
......@@ -324,7 +323,6 @@ class TDDnode:
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
print("dnode:%d is running with %s " % (self.index, cmd))
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
if self.valgrind == 0:
time.sleep(0.1)
......@@ -358,7 +356,7 @@ class TDDnode:
# break
# elif bkey2 in line:
# popen.kill()
# break
# break
# if time.time() > timeout:
# print(time.time(),timeout)
# tdLog.exit('wait too long for taosd start')
......@@ -407,7 +405,6 @@ class TDDnode:
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
print("dnode:%d is running with %s " % (self.index, cmd))
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
if self.valgrind == 0:
time.sleep(0.1)
......@@ -664,7 +661,6 @@ class TDDnodes:
def stoptaosd(self, index):
self.check(index)
self.dnodes[index - 1].stoptaosd()
def start(self, index):
self.check(index)
......
......@@ -235,9 +235,17 @@ class TDSql:
tdLog.info("sql:%s, row:%d col:%d data:%s == expect:%s" %
(self.sql, row, col, self.queryResult[row][col], data))
return
elif isinstance(data, float) and abs(self.queryResult[row][col] - data) <= 0.000001:
tdLog.info("sql:%s, row:%d col:%d data:%f == expect:%f" %
(self.sql, row, col, self.queryResult[row][col], data))
elif isinstance(data, float):
if abs(data) >= 1 and abs((self.queryResult[row][col] - data) / data) <= 0.000001:
tdLog.info("sql:%s, row:%d col:%d data:%f == expect:%f" %
(self.sql, row, col, self.queryResult[row][col], data))
elif abs(data) < 1 and abs(self.queryResult[row][col] - data) <= 0.000001:
tdLog.info("sql:%s, row:%d col:%d data:%f == expect:%f" %
(self.sql, row, col, self.queryResult[row][col], data))
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
return
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
......@@ -323,13 +331,32 @@ class TDSql:
args = (caller.filename, caller.lineno, self.sql, col_name_list, expect_col_name_list)
tdLog.exit("%s(%d) failed: sql:%s, col_name_list:%s != expect_col_name_list:%s" % args)
def __check_equal(self, elm, expect_elm):
if not type(elm) in(list, tuple) and elm == expect_elm:
return True
if type(elm) in(list, tuple) and type(expect_elm) in(list, tuple):
if len(elm) != len(expect_elm):
return False
if len(elm) == 0:
return True
for i in range(len(elm)):
flag = self.__check_equal(elm[i], expect_elm[i])
if not flag:
return False
return True
return False
def checkEqual(self, elm, expect_elm):
if elm == expect_elm:
tdLog.info("sql:%s, elm:%s == expect_elm:%s" % (self.sql, elm, expect_elm))
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
tdLog.exit("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
return
if self.__check_equal(elm, expect_elm):
tdLog.info("sql:%s, elm:%s == expect_elm:%s" % (self.sql, elm, expect_elm))
return
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
tdLog.exit("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
def checkNotEqual(self, elm, expect_elm):
if elm != expect_elm:
......
import socket
from fabric2 import Connection
from util.log import *
from util.common import *
class TAdapter:
def __init__(self):
self.running = 0
self.deployed = 0
self.remoteIP = ""
self.taosadapter_cfg_dict = {
"debug" : True,
"taosConfigDir" : "",
"port" : 6041,
"logLevel" : "debug",
"cors" : {
"allowAllOrigins" : True,
},
"pool" : {
"maxConnect" : 4000,
"maxIdle" : 4000,
"idleTimeout" : "1h"
},
"ssl" : {
"enable" : False,
"certFile" : "",
"keyFile" : "",
},
"log" : {
"path" : "",
"rotationCount" : 30,
"rotationTime" : "24h",
"rotationSize" : "1GB",
"enableRecordHttpSql" : True,
"sqlRotationCount" : 2,
"sqlRotationTime" : "24h",
"sqlRotationSize" : "1GB",
},
"monitor" : {
"collectDuration" : "3s",
"incgroup" : False,
"pauseQueryMemoryThreshold" : 70,
"pauseAllMemoryThreshold" : 80,
"identity" : "",
"writeToTD" : True,
"user" : "root",
"password" : "taosdata",
"writeInterval" : "30s"
},
"opentsdb" : {
"enable" : False
},
"influxdb" : {
"enable" : False
},
"statsd" : {
"enable" : False
},
"collectd" : {
"enable" : False
},
"opentsdb_telnet" : {
"enable" : False
},
"node_exporter" : {
"enable" : False
},
"prometheus" : {
"enable" : False
},
}
# TODO: add taosadapter env:
# 1. init cfg.toml.dict :OK
# 2. dump dict to toml : OK
# 3. update cfg.toml.dict :OK
# 4. check adapter exists : OK
# 5. deploy adapter cfg : OK
# 6. adapter start : OK
# 7. adapter stop
def init(self, path, remoteIP=""):
self.path = path
self.remoteIP = remoteIP
binPath = get_path() + "/../../../"
binPath = os.path.realpath(binPath)
if path == "":
self.path = os.path.abspath(binPath + "../../")
else:
self.path = os.path.realpath(path)
if self.remoteIP:
try:
self.config = eval(remoteIP)
self.remote_conn = Connection(host=self.config["host"], port=self.config["port"], user=self.config["user"], connect_kwargs={'password':self.config["password"]})
except Exception as e:
tdLog.notice(e)
def update_cfg(self, update_dict :dict):
if not isinstance(update_dict, dict):
return
if "log" in update_dict and "path" in update_dict["log"]:
del update_dict["log"]["path"]
for key, value in update_dict.items():
if key in ["cors", "pool", "ssl", "log", "monitor", "opentsdb", "influxdb", "statsd", "collectd", "opentsdb_telnet", "node_exporter", "prometheus"]:
if isinstance(value, dict):
for k, v in value.items():
self.taosadapter_cfg_dict[key][k] = v
else:
self.taosadapter_cfg_dict[key] = value
def check_adapter(self):
if getPath(tool="taosadapter"):
return False
else:
return True
def remote_exec(self, updateCfgDict, execCmd):
remoteCfgDict = copy.deepcopy(updateCfgDict)
if "log" in remoteCfgDict and "path" in remoteCfgDict["log"]:
del remoteCfgDict["log"]["path"]
remoteCfgDictStr = base64.b64encode(toml.dumps(remoteCfgDict).encode()).decode()
execCmdStr = base64.b64encode(execCmd.encode()).decode()
with self.remote_conn.cd((self.config["path"]+sys.path[0].replace(self.path, '')).replace('\\','/')):
self.remote_conn.run(f"python3 ./test.py -D {remoteCfgDictStr} -e {execCmdStr}" )
def cfg(self, option, value):
cmd = f"echo {option} = {value} >> {self.cfg_path}"
if os.system(cmd) != 0:
tdLog.exit(cmd)
def deploy(self, *update_cfg_dict):
self.log_dir = f"{self.path}/sim/dnode1/log"
self.cfg_dir = f"{self.path}/sim/dnode1/cfg"
self.cfg_path = f"{self.cfg_dir}/taosadapter.toml"
cmd = f"touch {self.cfg_path}"
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.taosadapter_cfg_dict["log"]["path"] = self.log_dir
if bool(update_cfg_dict):
self.update_cfg(update_dict=update_cfg_dict)
if (self.remoteIP == ""):
dict2toml(self.taosadapter_cfg_dict, self.cfg_path)
else:
self.remote_exec(self.taosadapter_cfg_dict, "tAdapter.deploy(update_cfg_dict)")
self.deployed = 1
tdLog.debug(f"taosadapter is deployed and configured by {self.cfg_path}")
def start(self):
bin_path = get_path(tool="taosadapter")
if (bin_path == ""):
tdLog.exit("taosadapter not found!")
else:
tdLog.info(f"taosadapter found: {bin_path}")
if platform.system().lower() == 'windows':
cmd = f"mintty -h never {bin_path} -c {self.cfg_dir}"
else:
cmd = f"nohup {bin_path} -c {self.cfg_path} > /dev/null 2>&1 & "
if self.remoteIP:
self.remote_exec(self.taosadapter_cfg_dict, f"tAdapter.deployed=1\ntAdapter.log_dir={self.log_dir}\ntAdapter.cfg_dir={self.cfg_dir}\ntAdapter.start()")
self.running = 1
else:
os.system(f"rm -rf {self.log_dir}/taosadapter*")
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
tdLog.debug(f"taosadapter is running with {cmd} " )
time.sleep(0.1)
taosadapter_port = self.taosadapter_cfg_dict["port"]
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
try:
res = s.connect_ex((self.remoteIP, taosadapter_port))
s.shutdown(2)
if res == 0:
tdLog.info(f"the taosadapter has been started, using port:{taosadapter_port}")
else:
tdLog.info(f"the taosadapter do not started!!!")
except socket.error as e:
tdLog.notice("socket connect error!")
finally:
if s:
s.close()
# tdLog.debug("the taosadapter has been started.")
time.sleep(1)
def start_taosadapter(self):
"""
use this method, must deploy taosadapter
"""
bin_path = get_path(tool="taosadapter")
if (bin_path == ""):
tdLog.exit("taosadapter not found!")
else:
tdLog.info(f"taosadapter found: {bin_path}")
if self.deployed == 0:
tdLog.exit("taosadapter is not deployed")
if platform.system().lower() == 'windows':
cmd = f"mintty -h never {bin_path} -c {self.cfg_dir}"
else:
cmd = f"nohup {bin_path} -c {self.cfg_path} > /dev/null 2>&1 & "
if self.remoteIP:
self.remote_exec(self.taosadapter_cfg_dict, f"tAdapter.deployed=1\ntAdapter.log_dir={self.log_dir}\ntAdapter.cfg_dir={self.cfg_dir}\ntAdapter.start()")
self.running = 1
else:
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
tdLog.debug(f"taosadapter is running with {cmd} " )
time.sleep(0.1)
def stop(self, force_kill=False):
signal = "-SIGKILL" if force_kill else "-SIGTERM"
if self.remoteIP:
self.remote_exec(self.taosadapter_cfg_dict, "tAdapter.running=1\ntAdapter.stop()")
tdLog.info("stop taosadapter")
return
toBeKilled = "taosadapter"
if self.running != 0:
psCmd = f"ps -ef|grep -w {toBeKilled}| grep -v grep | awk '{{print $2}}'"
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = f"kill {signal} {processID} > /dev/null 2>&1"
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
if not platform.system().lower() == 'windows':
for port in range(6030, 6041):
fuserCmd = f"fuser -k -n tcp {port} > /dev/null"
os.system(fuserCmd)
self.running = 0
tdLog.debug(f"taosadapter is stopped by kill {signal}")
tAdapter = TAdapter()
\ No newline at end of file
......@@ -14,9 +14,9 @@ system tsim/parser/gendata.sh
sql create table stbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2)) tags(a int, b binary(12));
sql create table tbx (ts TIMESTAMP, collect_area NCHAR(12), device_id BINARY(16), imsi BINARY(16), imei BINARY(16), mdn BINARY(10), net_type BINARY(4), mno NCHAR(4), province NCHAR(10), city NCHAR(16), alarm BINARY(2))
print ====== create tables success, starting import data
print ====== create tables success, starting insert data
sql import into tbx file '~/data.sql'
sql insert into tbx file '~/data.sql'
sql import into tbx file '~/data.sql'
sql select count(*) from tbx
......
......@@ -9,31 +9,41 @@ from util.dnodes import *
PRIMARY_COL = "ts"
INT_COL = "c_int"
BINT_COL = "c_bint"
SINT_COL = "c_sint"
TINT_COL = "c_tint"
FLOAT_COL = "c_float"
DOUBLE_COL = "c_double"
BOOL_COL = "c_bool"
TINT_UN_COL = "c_tint_un"
SINT_UN_COL = "c_sint_un"
BINT_UN_COL = "c_bint_un"
INT_UN_COL = "c_int_un"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
NUM_COL = [ INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [ BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [ BOOL_COL, ]
TS_TYPE_COL = [ TS_COL, ]
INT_COL = "c_int"
BINT_COL = "c_bint"
SINT_COL = "c_sint"
TINT_COL = "c_tint"
FLOAT_COL = "c_float"
DOUBLE_COL = "c_double"
BOOL_COL = "c_bool"
TINT_UN_COL = "c_utint"
SINT_UN_COL = "c_usint"
BINT_UN_COL = "c_ubint"
INT_UN_COL = "c_uint"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, TINT_UN_COL, SINT_UN_COL, BINT_UN_COL, INT_UN_COL]
CHAR_COL = [BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [BOOL_COL, ]
TS_TYPE_COL = [TS_COL, ]
INT_TAG = "t_int"
ALL_COL = [PRIMARY_COL, INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BINARY_COL, NCHAR_COL, BOOL_COL, TS_COL]
TAG_COL = [INT_TAG]
## insert data args:
TIME_STEP = 10000
NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
# init db/table
DBNAME = "db"
STBNAME = "stb1"
CTBNAME = "ct1"
NTBNAME = "nt1"
@dataclass
class DataSet:
ts_data : List[int] = field(default_factory=list)
......@@ -152,29 +162,31 @@ class TDTestCase:
self.test_create_databases()
self.test_create_stb()
def __create_tb(self):
def __create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, rsma=False):
tdLog.printNoPrefix("==========step: create table")
create_stb_sql = f'''create table stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
) tags (t1 int)
'''
create_ntb_sql = f'''create table t1(
create_stb_sql = f'''create table {stb}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
)
) tags ({INT_TAG} int)
'''
for i in range(ntbnum):
create_ntb_sql = f'''create table nt{i+1}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
)
'''
tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
for i in range(ctb_num):
tdSql.execute(f'create table ct{i+1} using {stb} tags ( {i+1} )')
def __data_set(self, rows):
data_set = DataSet()
......@@ -220,7 +232,7 @@ class TDTestCase:
tdSql.execute( f"insert into ct1 values ( {NOW - i * TIME_STEP}, {row_data} )" )
tdSql.execute( f"insert into ct2 values ( {NOW - i * int(TIME_STEP * 0.6)}, {neg_row_data} )" )
tdSql.execute( f"insert into ct4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )" )
tdSql.execute( f"insert into t1 values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )" )
tdSql.execute( f"insert into {NTBNAME} values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )" )
tdSql.execute( f"insert into ct2 values ( {NOW + int(TIME_STEP * 0.6)}, {null_data} )" )
tdSql.execute( f"insert into ct2 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.6)}, {null_data} )" )
......@@ -230,9 +242,9 @@ class TDTestCase:
tdSql.execute( f"insert into ct4 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.8)}, {null_data} )" )
tdSql.execute( f"insert into ct4 values ( {NOW - self.rows * int(TIME_STEP * 0.39)}, {null_data} )" )
tdSql.execute( f"insert into t1 values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )" )
tdSql.execute( f"insert into t1 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )" )
tdSql.execute( f"insert into t1 values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )" )
tdSql.execute( f"insert into {NTBNAME} values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )" )
tdSql.execute( f"insert into {NTBNAME} values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )" )
tdSql.execute( f"insert into {NTBNAME} values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )" )
def run(self):
......
......@@ -325,10 +325,17 @@ class TDTestCase:
def __sma_create_check(self, sma:SMAschema):
if self.updatecfgDict["querySmaOptimize"] == 0:
return False
# TODO: if database is a rollup-db, can not create sma index
# tdSql.query("select database()")
# if sma.rollup_db :
# return False
tdSql.query("select database()")
dbname = tdSql.getData(0,0)
tdSql.query("show databases")
for row in tdSql.queryResult:
if row[0] == dbname:
if row[-1] is None:
continue
if ":" in row[-1]:
sma.rollup_db = True
if sma.rollup_db :
return False
tdSql.query("show stables")
if not sma.tbname:
return False
......@@ -379,12 +386,15 @@ class TDTestCase:
tdSql.query(self.__create_sma_index(sma))
self.sma_count += 1
self.sma_created_index.append(sma.index_name)
tdSql.query("show streams")
tdSql.query(self.__show_sma_index(sma))
tdSql.checkRows(self.sma_count)
tdSql.checkData(0, 2, sma.tbname)
else:
tdSql.error(self.__create_sma_index(sma))
def __drop_sma_index(self, sma:SMAschema):
sql = f"{sma.drop} {sma.drop_flag} {sma.index_name}"
return sql
......@@ -402,12 +412,12 @@ class TDTestCase:
def sma_drop_check(self, sma:SMAschema):
if self.__sma_drop_check(sma):
tdSql.query(self.__drop_sma_index(sma))
print(self.__drop_sma_index(sma))
self.sma_count -= 1
self.sma_created_index = list(filter(lambda x: x != sma.index_name, self.sma_created_index))
tdSql.query("show streams")
tdSql.checkRows(self.sma_count)
else:
tdSql.error(self.__drop_sma_index(sma))
......@@ -614,20 +624,20 @@ class TDTestCase:
self.__insert_data()
self.all_test()
#tdLog.printNoPrefix("==========step2:create table in rollup database")
#tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m")
#tdSql.execute("use db3")
# self.__create_tb()
#tdSql.execute(f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL}) ")
#self.all_test()
# self.__insert_data()
tdLog.printNoPrefix("==========step2:create table in rollup database")
tdSql.execute("create database db3 retentions 1s:4m,2s:8m,3s:12m")
tdSql.execute("use db3")
tdSql.execute(f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL}) ")
self.all_test()
tdSql.execute("drop database if exists db1 ")
tdSql.execute("drop database if exists db2 ")
tdDnodes.stop(1)
tdDnodes.start(1)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.execute("flush database db ")
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
......
此差异已折叠。
......@@ -20,12 +20,13 @@ from util.sqlset import TDSetSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(),logSql)
tdSql.init(conn.cursor(),False)
self.rowNum = 10
self.ts = 1537146000000
self.setsql = TDSetSql()
self.ntbname = 'ntb'
self.stbname = 'stb'
self.dbname = "db"
self.ntbname = f"{self.dbname}.ntb"
self.stbname = f'{self.dbname}.stb'
self.binary_length = 20 # the length of binary for column_dict
self.nchar_length = 20 # the length of nchar for column_dict
self.column_dict = {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -26,7 +26,7 @@ class TDTestCase:
tdSql.init(conn.cursor())
self.dbname = 'db_test'
self.setsql = TDSetSql()
self.ntbname = 'ntb'
self.ntbname = f'{self.dbname}.ntb'
self.rowNum = 10
self.tbnum = 20
self.ts = 1537146000000
......@@ -96,7 +96,7 @@ class TDTestCase:
self.bottom_check_data(self.ntbname,'normal_table')
tdSql.execute(f'drop database {self.dbname}')
def bottom_check_stb(self):
stbname = tdCom.getLongName(5, "letters")
stbname = f'{self.dbname}.{tdCom.getLongName(5, "letters")}'
tag_dict = {
't0':'int'
}
......@@ -109,7 +109,7 @@ class TDTestCase:
for i in range(self.tbnum):
tdSql.execute(f"create table {stbname}_{i} using {stbname} tags({tag_values[0]})")
self.insert_data(self.column_dict,f'{stbname}_{i}',self.rowNum)
tdSql.query('show tables')
tdSql.query(f'show {self.dbname}.tables')
vgroup_list = []
for i in range(len(tdSql.queryResult)):
vgroup_list.append(tdSql.queryResult[i][6])
......
此差异已折叠。
此差异已折叠。
......@@ -5,7 +5,6 @@ import json
from dataclasses import dataclass, field
from typing import List, Any, Tuple
from certifi import where
from util.log import tdLog
from util.sql import tdSql
from util.cases import tdCases
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Subproject commit fa2d82918353a3b56e40838572120c1a4ece644c
Subproject commit 267a96fb09fc2ba14acfa47f7d3678def64c29c5
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册