提交 e87baa8d 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

......@@ -41,6 +41,7 @@ extern "C" {
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_INS_TABLE_VNODES "vnodes"
#define TSDB_INS_TABLE_CONFIGS "configs"
#define TSDB_INS_TABLE_DNODE_VARIABLES "dnode_variables"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "smas"
......
......@@ -1317,7 +1317,7 @@ int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq
int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
typedef struct {
char queryStrId[TSDB_QUERY_ID_LEN];
char queryStrId[TSDB_QUERY_ID_LEN];
} SKillQueryReq;
int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
......@@ -2282,6 +2282,9 @@ typedef struct {
int8_t igNotExists;
} SMDropStreamReq;
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
typedef struct {
int8_t reserved;
} SMDropStreamRsp;
......
......@@ -69,6 +69,7 @@ typedef struct SCatalogReq {
SArray* pUser; // element is SUserAuthInfo
SArray* pTableIndex; // element is SNAME
bool qNodeRequired; // valid qnode
bool dNodeRequired; // valid dnode
bool forceUpdate;
} SCatalogReq;
......@@ -88,6 +89,7 @@ typedef struct SMetaData {
SArray* pIndex; // pRes = SIndexInfo*
SArray* pUser; // pRes = bool*
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
SArray* pDnodeList; // pRes = SArray<SEpSet>*
} SMetaData;
typedef struct SCatalogCfg {
......@@ -268,6 +270,8 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray** pDnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num);
......
......@@ -67,7 +67,7 @@ typedef struct SResultRowEntryInfo {
bool initialized:1; // output buffer has been initialized
bool complete:1; // query has completed
uint8_t isNullRes:6; // the result is null
uint8_t numOfRes; // num of output result in current buffer
uint16_t numOfRes; // num of output result in current buffer
} SResultRowEntryInfo;
// determine the real data need to calculated the result
......
......@@ -119,14 +119,14 @@ typedef struct SCreateTableStmt {
} SCreateTableStmt;
typedef struct SCreateSubTableClause {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char useDbName[TSDB_DB_NAME_LEN];
char useTableName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SNodeList* pSpecificTags;
SNodeList* pValsOfTags;
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char useDbName[TSDB_DB_NAME_LEN];
char useTableName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SNodeList* pSpecificTags;
SNodeList* pValsOfTags;
STableOptions* pOptions;
} SCreateSubTableClause;
......@@ -230,6 +230,11 @@ typedef struct SShowTableDistributedStmt {
char tableName[TSDB_TABLE_NAME_LEN];
} SShowTableDistributedStmt;
typedef struct SShowDnodeVariablesStmt {
ENodeType type;
SNode* pDnodeId;
} SShowDnodeVariablesStmt;
typedef enum EIndexType { INDEX_TYPE_SMA = 1, INDEX_TYPE_FULLTEXT } EIndexType;
typedef struct SIndexOptions {
......
......@@ -180,7 +180,9 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_VNODES_STMT,
QUERY_NODE_SHOW_APPS_STMT,
QUERY_NODE_SHOW_SCORES_STMT,
QUERY_NODE_SHOW_VARIABLE_STMT,
QUERY_NODE_SHOW_VARIABLES_STMT,
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT,
QUERY_NODE_SHOW_DNODE_VARIABLES_STMT,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
......
......@@ -887,6 +887,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp;
pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
if (taos_num_fields(pRequest) == 0) {
......
......@@ -43,6 +43,26 @@ void showDB(TAOS* pConn) {
}
}
void printResult(TAOS_RES* pRes) {
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
int32_t n = 0;
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
for(int32_t i = 0; i < numOfFields; ++i) {
printf("(%d):%d " , i, length[i]);
}
printf("\n");
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
memset(str, 0, sizeof(str));
}
}
void fetchCallback(void* param, void* res, int32_t numOfRow) {
#if 0
printf("numOfRow = %d \n", numOfRow);
......@@ -729,48 +749,31 @@ TEST(testCase, projection_query_tables) {
// taos_close(pConn);
//}
//TEST(testCase, agg_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show stables");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// int32_t n = 0;
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t* length = taos_fetch_lengths(pRes);
// for(int32_t i = 0; i < numOfFields; ++i) {
// printf("(%d):%d " , i, length[i]);
// }
// printf("\n");
//
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// memset(str, 0, sizeof(str));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
#endif
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "show table distributed st1");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
printResult(pRes);
taos_free_result(pRes);
taos_close(pConn);
}
#endif
/*
--- copy the following script in the shell to setup the environment ---
......@@ -786,7 +789,7 @@ TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use table_alltype_hyperloglog");
taos_query(pConn, "use abc1");
#if 0
TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
if (taos_errno(pRes) != 0) {
......@@ -812,7 +815,7 @@ TEST(testCase, async_api_test) {
}
#endif
taos_query_a(pConn, "select HYPERLOGLOG(q_ts) from stable_1_2 where ts between 1630000001000 and 1630100001000 interval(19d) Fill(NONE);", queryCallback, pConn);
taos_query_a(pConn, "select count(*) from tu", queryCallback, pConn);
getchar();
taos_close(pConn);
}
......
......@@ -4267,6 +4267,35 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
return 0;
}
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast);
......
......@@ -1121,6 +1121,10 @@ _return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray** pDnodeList) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) {
CTG_API_ENTER();
......
......@@ -131,6 +131,8 @@ static int32_t execShowCreateSTable(SShowCreateTableStmt* pStmt) { return TSDB_C
static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return TSDB_CODE_FAILED; }
static int32_t execShowLocalVariables() { return TSDB_CODE_FAILED; }
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) {
switch (nodeType(pStmt)) {
case QUERY_NODE_DESCRIBE_STMT:
......@@ -145,6 +147,8 @@ int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp) {
return execShowCreateSTable((SShowCreateTableStmt*)pStmt);
case QUERY_NODE_ALTER_LOCAL_STMT:
return execAlterLocal((SAlterLocalStmt*)pStmt);
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
return execShowLocalVariables();
default:
break;
}
......
......@@ -123,4 +123,6 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode);
#endif // TDENGINE_QUERYUTIL_H
......@@ -566,13 +566,14 @@ typedef struct SStreamSessionAggOperatorInfo {
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SOptrBasicInfo binfo;
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pCols; // SArray<SColumn>
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
......@@ -670,7 +671,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSup(SExprSupp* pSup);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
......@@ -757,8 +758,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, /*SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, */SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo);
......
......@@ -694,4 +694,32 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
taosMemoryFree(pCond->twindows);
taosMemoryFree(pCond->colList);
}
\ No newline at end of file
}
int32_t convertFillType(int32_t mode) {
int32_t type = TSDB_FILL_NONE;
switch (mode) {
case FILL_MODE_PREV:
type = TSDB_FILL_PREV;
break;
case FILL_MODE_NONE:
type = TSDB_FILL_NONE;
break;
case FILL_MODE_NULL:
type = TSDB_FILL_NULL;
break;
case FILL_MODE_NEXT:
type = TSDB_FILL_NEXT;
break;
case FILL_MODE_VALUE:
type = TSDB_FILL_SET_VALUE;
break;
case FILL_MODE_LINEAR:
type = TSDB_FILL_LINEAR;
break;
default:
type = TSDB_FILL_NONE;
}
return type;
}
......@@ -2754,7 +2754,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error;
}
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, num, &pOperator->exprSupp.rowEntryInfoOffset);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
......@@ -2762,7 +2766,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -2783,12 +2787,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->name = "SortedMerge";
// pOperator->operatorType = OP_SortedMerge;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
......@@ -3408,7 +3409,11 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey) {
initExprSupp(pSup, pExprInfo, numOfCols);
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
......@@ -3431,12 +3436,17 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
initResultRowInfo(&pInfo->resultRowInfo);
}
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
pSup->pExprInfo = pExprInfo;
pSup->numOfExprs = numOfExpr;
if (pSup->pExprInfo != NULL) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
if (pSup->pCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......@@ -3458,7 +3468,10 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
initBasicInfo(&pInfo->binfo, pResultBlock);
initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->groupId = INT32_MIN;
pOperator->name = "TableAggregate";
......@@ -3723,13 +3736,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
if (pPhyNode->pExprs != NULL) {
SExprSupp* pSup1 = &pInfo->scalarSup;
pSup1->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup1->numOfExprs);
pSup1->pCtx = createSqlFunctionCtx(pSup1->pExprInfo, pSup1->numOfExprs, &pSup1->rowEntryInfoOffset);
int32_t num = 0;
SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
;
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
......@@ -3746,15 +3761,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfExpr;
pOperator->pTaskInfo = pTaskInfo;
......@@ -3795,34 +3809,6 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
}
static int32_t convertFillType(int32_t mode) {
int32_t type = TSDB_FILL_NONE;
switch (mode) {
case FILL_MODE_PREV:
type = TSDB_FILL_PREV;
break;
case FILL_MODE_NONE:
type = TSDB_FILL_NONE;
break;
case FILL_MODE_NULL:
type = TSDB_FILL_NULL;
break;
case FILL_MODE_NEXT:
type = TSDB_FILL_NEXT;
break;
case FILL_MODE_VALUE:
type = TSDB_FILL_SET_VALUE;
break;
case FILL_MODE_LINEAR:
type = TSDB_FILL_LINEAR;
break;
default:
type = TSDB_FILL_NONE;
}
return type;
}
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
......@@ -3852,10 +3838,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
......@@ -4282,6 +4268,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else {
ASSERT(0);
}
......
......@@ -387,11 +387,12 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
pInfo->scalarSup.pExprInfo = pScalarExprInfo;
pInfo->scalarSup.numOfExprs = numOfScalarExpr;
pInfo->scalarSup.pCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowEntryInfoOffset);
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -718,10 +719,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
pInfo->scalarSup.numOfExprs = 0;
pInfo->scalarSup.pExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSup.numOfExprs);
pInfo->scalarSup.pCtx = createSqlFunctionCtx(
pInfo->scalarSup.pExprInfo, pInfo->scalarSup.numOfExprs, &pInfo->scalarSup.rowEntryInfoOffset);
int32_t num = 0;
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......
......@@ -686,7 +686,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "DataBlockDistScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
......@@ -1872,7 +1875,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList;
......
......@@ -1705,7 +1705,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pBInfo->pRes;
}
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock) {
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
......@@ -1715,21 +1715,140 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, i);
char* val = colDataGetData(pColInfoData, rowIndex);
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock,
int32_t rowIndex, SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows;
// todo set the correct primary timestamp column
// output the result
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL:
colDataAppendNULL(pDst, rows);
break;
case TSDB_FILL_SET_VALUE: {
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char*)&v, false);
}
} break;
case TSDB_FILL_LINEAR:
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
// goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = ts, .val = &v1};
SPoint point2 = {.key = nextTs, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
#endif
break;
case TSDB_FILL_PREV: {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, rows, pkey->pData, false);
} break;
case TSDB_FILL_NEXT: {
char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
} break;
case TSDB_FILL_NONE:
default:
break;
}
}
pResBlock->info.rows += 1;
}
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
if (pInfo->pPrevRow != NULL) {
return TSDB_CODE_SUCCESS;
}
pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pPrevRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfCols = pBlock->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SGroupKeys key = {0};
key.bytes = pColInfo->info.bytes;
key.type = pColInfo->info.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
taosArrayPush(pInfo->pPrevRow, &key);
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->binfo.pRes;
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
......@@ -1750,10 +1869,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
int32_t code = initPrevRowsKeeper(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*) colDataGetData(pTsCol, i);
......@@ -1764,114 +1888,60 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot);
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
char* v = colDataGetData(pSrc, i);
colDataAppend(pDst, numOfRows, v, false);
}
numOfRows += 1;
pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
} else if (ts < pSliceInfo->current) {
if (i != pBlock->info.window.ekey) {
if (i < pBlock->info.rows - 1) {
int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
// output the result
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL:
colDataAppendNULL(pDst, numOfRows);
break;
case TSDB_FILL_SET_VALUE: {
SVariant* pVar = &pSliceInfo->pFillColInfo[i].fillVal;
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, numOfRows, (char*)&v, false);
}
}
break;
case TSDB_FILL_LINEAR:
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
// goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = ts, .val = &v1};
SPoint point2 = {.key = nextTs, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
#endif
break;
case TSDB_FILL_PREV: {
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
colDataAppend(pDst, numOfRows, pkey->pData, false);
} break;
case TSDB_FILL_NEXT: {
} break;
case TSDB_FILL_NONE:
default:
break;
}
pSliceInfo->current +=
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock);
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
}
}
......@@ -1886,59 +1956,46 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
return pResBlock->info.rows == 0 ? NULL : pResBlock;
}
static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfCols) {
pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
pInfo->pCols = taosArrayInit(4, sizeof(SColumn));
if (pInfo->pPrevRow == NULL || pInfo->pCols == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr;
SFunctParam* pParam = &pExpr->base.pParam[0];
SColumn c = *pParam->pCol;
taosArrayPush(pInfo->pCols, &c);
SGroupKeys key = {0};
key.bytes = c.bytes;
key.type = c.type;
key.isNull = false;
key.pData = taosMemoryCalloc(1, c.bytes);
taosArrayPush(pInfo->pPrevRow, &key);
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pPhyNode, SExecTaskInfo* pTaskInfo) {
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) {
goto _error;
}
SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t code = initTimesliceInfo(pInfo, pSup->pCtx, numOfCols);
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode);
if (pInterpPhyNode->pExprs != NULL) {
int32_t num = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
initResultSizeInfo(pOperator, 4096);
pInfo->binfo.pRes = pResultBlock;
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey;
pOperator->name = "TimeSliceOperator";
// pOperator->operatorType = OP_AllTimeWindow;
pOperator->blocking = true;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
......@@ -2360,9 +2417,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
}
}
// semi interval operator does not catch result
if (!IS_FINAL_OP(pInfo)) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
......@@ -2376,6 +2430,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->name = "StreamSemiIntervalOperator";
}
if (!IS_FINAL_OP(pInfo)) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
......@@ -2432,7 +2490,11 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
}
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
initExprSupp(pSup, pExprInfo, numOfCols);
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
initBasicInfo(pBasicInfo, pResultBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
......
......@@ -374,11 +374,6 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
SSDataBlock* pRightBlock = pRightSource->src.pBlock;
// first sort by block groupId
if (pLeftBlock->info.groupId != pRightBlock->info.groupId) {
return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1;
}
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
......
......@@ -207,9 +207,12 @@ SNode* nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_SCORES_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT:
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return makeNode(type, sizeof(SShowStmt));
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
return makeNode(type, sizeof(SShowDnodeVariablesStmt));
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
return makeNode(type, sizeof(SShowCreateDatabaseStmt));
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
......@@ -637,13 +640,16 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_SHOW_VNODES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
case QUERY_NODE_SHOW_SCORES_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT:
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: {
SShowStmt* pStmt = (SShowStmt*)pNode;
nodesDestroyNode(pStmt->pDbName);
nodesDestroyNode(pStmt->pTbName);
break;
}
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT: // no pointer field
break;
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
taosMemoryFreeClear(((SShowCreateDatabaseStmt*)pNode)->pCfg);
break;
......
......@@ -161,6 +161,7 @@ SNode* createShowStmtWithCond(SAstCreateContext* pCxt, ENodeType type, SNode* pD
SNode* createShowCreateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode* pRealTable);
SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable);
SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId);
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword);
SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal);
SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
......
......@@ -47,6 +47,8 @@ typedef struct SParseMetaCache {
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
......@@ -77,6 +79,7 @@ int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pD
int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache);
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
......@@ -87,6 +90,7 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con
bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache);
#ifdef __cplusplus
......
......@@ -373,7 +373,9 @@ cmd ::= SHOW CREATE STABLE full_table_name(A).
cmd ::= SHOW QUERIES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QUERIES_STMT); }
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT); }
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT); }
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT); }
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLES_STMT); }
cmd ::= SHOW LOCAL VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT); }
cmd ::= SHOW DNODE NK_INTEGER(A) VARIABLES. { pCxt->pRootNode = createShowDnodeVariablesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A)); }
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT); }
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT); }
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT); }
......
......@@ -1197,6 +1197,14 @@ SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable
return (SNode*)pStmt;
}
SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId) {
CHECK_PARSER_STATUS(pCxt);
SShowDnodeVariablesStmt* pStmt = (SShowDnodeVariablesStmt*)nodesMakeNode(QUERY_NODE_SHOW_DNODE_VARIABLES_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->pDnodeId = pDnodeId;
return (SNode*)pStmt;
}
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword) {
CHECK_PARSER_STATUS(pCxt);
char password[TSDB_USET_PASSWORD_LEN] = {0};
......
......@@ -397,6 +397,15 @@ static int32_t collectMetaKeyFromShowVariables(SCollectMetaKeyCxt* pCxt, SShowSt
pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromShowDnodeVariables(SCollectMetaKeyCxt* pCxt, SShowDnodeVariablesStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
TSDB_INS_TABLE_DNODE_VARIABLES, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromShowCreateDatabase(SCollectMetaKeyCxt* pCxt, SShowCreateDatabaseStmt* pStmt) {
return reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
......@@ -502,8 +511,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowConnections(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_QUERIES_STMT:
return collectMetaKeyFromShowQueries(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT:
return collectMetaKeyFromShowVariables(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
return collectMetaKeyFromShowDnodeVariables(pCxt, (SShowDnodeVariablesStmt*)pStmt);
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
return collectMetaKeyFromShowCreateDatabase(pCxt, (SShowCreateDatabaseStmt*)pStmt);
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
......
......@@ -305,6 +305,24 @@ static int32_t getTableIndex(STranslateContext* pCxt, const SName* pName, SArray
return code;
}
static int32_t getDnodeList(STranslateContext* pCxt, SArray** pDnodes) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = TSDB_CODE_SUCCESS;
if (pParCxt->async) {
code = getDnodeListFromCache(pCxt->pMetaCache, pDnodes);
} else {
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
.requestId = pParCxt->requestId,
.requestObjRefId = pParCxt->requestRid,
.mgmtEps = pParCxt->mgmtEpSet};
code = catalogGetDnodeList(pParCxt->pCatalog, &conn, pDnodes);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("getDnodeList error, code:%s", tstrerror(code));
}
return code;
}
static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
pCxt->pParseCxt = pParseCxt;
pCxt->errCode = TSDB_CODE_SUCCESS;
......@@ -1450,12 +1468,27 @@ static int32_t addMnodeToVgroupList(const SEpSet* pEpSet, SArray** pVgroupList)
return TSDB_CODE_SUCCESS;
}
static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
return TSDB_CODE_SUCCESS;
static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
size_t ndnode = taosArrayGetSize(pDnodes);
*pVgsInfo = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * ndnode);
if (NULL == *pVgsInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pVgsInfo)->numOfVgroups = ndnode;
for (int32_t i = 0; i < ndnode; ++i) {
memcpy(&((*pVgsInfo)->vgroups[i].epSet), taosArrayGet(pDnodes, i), sizeof(SEpSet));
}
return TSDB_CODE_SUCCESS;
}
static bool sysTableFromVnode(const char* pTable) {
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* vgroupList = NULL;
if ('\0' != pRealTable->qualDbName[0]) {
......@@ -1478,6 +1511,26 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
return code;
}
static int32_t setDnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
SArray* pDnodes = NULL;
int32_t code = getDnodeList(pCxt, &pDnodes);
if (TSDB_CODE_SUCCESS == code) {
code = dnodeToVgroupsInfo(pDnodes, &pRealTable->pVgroupList);
}
taosArrayDestroy(pDnodes);
return code;
}
static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (sysTableFromVnode(pRealTable->table.tableName)) {
return setVnodeSysTableVgroupList(pCxt, pName, pRealTable);
} else if (sysTableFromDnode(pRealTable->table.tableName)) {
return setDnodeSysTableVgroupList(pCxt, pName, pRealTable);
} else {
return TSDB_CODE_SUCCESS;
}
}
static int32_t setTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->pParseCxt->topicQuery) {
return TSDB_CODE_SUCCESS;
......@@ -4081,8 +4134,12 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
}
static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pStmt) {
// todo
return TSDB_CODE_SUCCESS;
SMDropStreamReq dropReq = {0};
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
tNameGetFullDbName(&name, dropReq.name);
dropReq.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
}
static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
......@@ -4538,7 +4595,8 @@ static const char* getSysDbName(ENodeType type) {
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_LICENCE_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT:
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
return TSDB_INFORMATION_SCHEMA_DB;
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
case QUERY_NODE_SHOW_QUERIES_STMT:
......@@ -4595,10 +4653,12 @@ static const char* getSysTableName(ENodeType type) {
return TSDB_PERFS_TABLE_TOPICS;
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return TSDB_PERFS_TABLE_TRANS;
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT:
return TSDB_INS_TABLE_CONFIGS;
case QUERY_NODE_SHOW_APPS_STMT:
return TSDB_PERFS_TABLE_APPS;
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
return TSDB_INS_TABLE_DNODE_VARIABLES;
default:
break;
}
......@@ -4725,6 +4785,21 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
return code;
}
static int32_t rewriteShowDnodeVariables(STranslateContext* pCxt, SQuery* pQuery) {
SSelectStmt* pStmt = NULL;
int32_t code = createSelectStmtForShow(nodeType(pQuery->pRoot), &pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = createOperatorNode(OP_TYPE_EQUAL, "dnode_id", ((SShowDnodeVariablesStmt*)pQuery->pRoot)->pDnodeId,
&pStmt->pWhere);
}
if (TSDB_CODE_SUCCESS == code) {
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
}
return code;
}
static SNode* createBlockDistInfoFunc() {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) {
......@@ -5720,10 +5795,13 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_CLUSTER_STMT:
case QUERY_NODE_SHOW_TOPICS_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
case QUERY_NODE_SHOW_VARIABLE_STMT:
case QUERY_NODE_SHOW_VARIABLES_STMT:
case QUERY_NODE_SHOW_APPS_STMT:
code = rewriteShow(pCxt, pQuery);
break;
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
code = rewriteShowDnodeVariables(pCxt, pQuery);
break;
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
code = rewriteShowTableDist(pCxt, pQuery);
break;
......@@ -5814,6 +5892,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break;
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
case QUERY_NODE_ALTER_LOCAL_STMT:
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
break;
default:
......
......@@ -561,6 +561,7 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReq(pMetaCache->pTableIndex, &pCatalogReq->pTableIndex);
}
pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired;
return code;
}
......@@ -656,6 +657,7 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
if (TSDB_CODE_SUCCESS == code) {
code = putTableDataToCache(pCatalogReq->pTableIndex, pMetaData->pTableIndex, &pMetaCache->pTableIndex);
}
pMetaCache->pDnodes = pMetaData->pDnodeList;
return code;
}
......@@ -875,6 +877,19 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
return code;
}
int32_t reserveDnodeRequiredInCache(SParseMetaCache* pMetaCache) {
pMetaCache->dnodeRequired = true;
return TSDB_CODE_SUCCESS;
}
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes) {
*pDnodes = taosArrayDup(pMetaCache->pDnodes);
if (NULL == *pDnodes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
void destoryParseMetaCache(SParseMetaCache* pMetaCache) {
taosHashCleanup(pMetaCache->pTableMeta);
taosHashCleanup(pMetaCache->pDbVgroup);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -93,6 +93,16 @@ void generateInformationSchema(MockCatalogService* mcs) {
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN);
builder.done();
}
{
ITableBuilder& builder = mcs->createTableBuilder("information_schema", "configs", TSDB_SYSTEM_TABLE, 1)
.addColumn("name", TSDB_DATA_TYPE_BINARY, TSDB_CONFIG_OPTION_LEN);
builder.done();
}
{
ITableBuilder& builder = mcs->createTableBuilder("information_schema", "dnode_variables", TSDB_SYSTEM_TABLE, 1)
.addColumn("dnode_id", TSDB_DATA_TYPE_INT);
builder.done();
}
}
void generatePerformanceSchema(MockCatalogService* mcs) {
......@@ -187,6 +197,12 @@ void generateFunctions(MockCatalogService* mcs) {
8);
}
void generateDnodes(MockCatalogService* mcs) {
mcs->createDnode(1, "host1", 7030);
mcs->createDnode(2, "host2", 7030);
mcs->createDnode(3, "host3", 7030);
}
} // namespace
int32_t __catalogGetHandle(const char* clusterId, struct SCatalog** catalogHandle) { return 0; }
......@@ -241,6 +257,10 @@ int32_t __catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmt
return g_mockCatalogService->catalogGetTableIndex(pName, pRes);
}
int32_t __catalogGetDnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray** pDnodeList) {
return g_mockCatalogService->catalogGetDnodeList(pDnodeList);
}
void initMetaDataEnv() {
g_mockCatalogService.reset(new MockCatalogService());
......@@ -258,6 +278,7 @@ void initMetaDataEnv() {
stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta);
stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta);
stub.set(catalogGetTableIndex, __catalogGetTableIndex);
stub.set(catalogGetDnodeList, __catalogGetDnodeList);
// {
// AddrAny any("libcatalog.so");
// std::map<std::string,void*> result;
......@@ -307,6 +328,7 @@ void generateMetaData() {
generateTestST1(g_mockCatalogService.get());
generateTestST2(g_mockCatalogService.get());
generateFunctions(g_mockCatalogService.get());
generateDnodes(g_mockCatalogService.get());
g_mockCatalogService->showTables();
}
......
......@@ -165,6 +165,14 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetDnodeList(SArray** pDnodes) const {
*pDnodes = taosArrayInit(dnode_.size(), sizeof(SEpSet));
for (const auto& dnode : dnode_) {
taosArrayPush(*pDnodes, &dnode.second);
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
......@@ -188,6 +196,9 @@ class MockCatalogServiceImpl {
if (TSDB_CODE_SUCCESS == code) {
code = getAllTableIndex(pCatalogReq->pTableIndex, &pMetaData->pTableIndex);
}
if (TSDB_CODE_SUCCESS == code && pCatalogReq->dNodeRequired) {
code = catalogGetDnodeList(&pMetaData->pDnodeList);
}
return code;
}
......@@ -303,11 +314,18 @@ class MockCatalogServiceImpl {
}
}
void createDnode(int32_t dnodeId, const std::string& host, int16_t port) {
SEpSet epSet = {0};
addEpIntoEpSet(&epSet, host.c_str(), port);
dnode_.insert(std::make_pair(dnodeId, epSet));
}
private:
typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache;
typedef std::map<std::string, TableMetaCache> DbMetaCache;
typedef std::map<std::string, std::shared_ptr<SFuncInfo>> UdfMetaCache;
typedef std::map<std::string, std::vector<STableIndexInfo>> IndexMetaCache;
typedef std::map<int32_t, SEpSet> DnodeCache;
uint64_t getNextId() { return id_++; }
......@@ -532,6 +550,7 @@ class MockCatalogServiceImpl {
DbMetaCache meta_;
UdfMetaCache udf_;
IndexMetaCache index_;
DnodeCache dnode_;
};
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {}
......@@ -557,6 +576,10 @@ void MockCatalogService::createFunction(const std::string& func, int8_t funcType
void MockCatalogService::createSmaIndex(const SMCreateSmaReq* pReq) { impl_->createSmaIndex(pReq); }
void MockCatalogService::createDnode(int32_t dnodeId, const std::string& host, int16_t port) {
impl_->createDnode(dnodeId, host, port);
}
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
return impl_->catalogGetTableMeta(pTableName, pTableMeta);
}
......@@ -581,6 +604,8 @@ int32_t MockCatalogService::catalogGetTableIndex(const SName* pTableName, SArray
return impl_->catalogGetTableIndex(pTableName, pIndexes);
}
int32_t MockCatalogService::catalogGetDnodeList(SArray** pDnodes) const { return impl_->catalogGetDnodeList(pDnodes); }
int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
}
......
......@@ -62,6 +62,7 @@ class MockCatalogService {
void showTables() const;
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize);
void createSmaIndex(const SMCreateSmaReq* pReq);
void createDnode(int32_t dnodeId, const std::string& host, int16_t port);
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
......@@ -69,6 +70,7 @@ class MockCatalogService {
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const;
int32_t catalogGetDnodeList(SArray** pDnodes) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
private:
......
......@@ -159,7 +159,35 @@ TEST_F(ParserInitialDTest, dropSTable) {
run("DROP STABLE st1");
}
// todo DROP stream
TEST_F(ParserInitialDTest, dropStream) {
useDb("root", "test");
SMDropStreamReq expect = {0};
auto clearDropStreamReq = [&]() { memset(&expect, 0, sizeof(SMDropStreamReq)); };
auto setDropStreamReq = [&](const char* pStream, int8_t igNotExists = 0) {
sprintf(expect.name, "0.%s", pStream);
expect.igNotExists = igNotExists;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_DROP_STREAM_STMT);
SMDropStreamReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMDropStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.igNotExists, expect.igNotExists);
});
setDropStreamReq("s1");
run("DROP STREAM s1");
clearDropStreamReq();
setDropStreamReq("s2", 1);
run("DROP STREAM IF EXISTS s2");
clearDropStreamReq();
}
TEST_F(ParserInitialDTest, dropTable) {
useDb("root", "test");
......
......@@ -76,6 +76,12 @@ TEST_F(ParserShowToUseTest, showDnodes) {
run("SHOW dnodes");
}
TEST_F(ParserShowToUseTest, showDnodeVariables) {
useDb("root", "test");
run("SHOW DNODE 1 VARIABLES");
}
TEST_F(ParserShowToUseTest, showFunctions) {
useDb("root", "test");
......@@ -84,6 +90,12 @@ TEST_F(ParserShowToUseTest, showFunctions) {
// todo SHOW licence
TEST_F(ParserShowToUseTest, showLocalVariables) {
useDb("root", "test");
run("SHOW LOCAL VARIABLES");
}
TEST_F(ParserShowToUseTest, showIndexes) {
useDb("root", "test");
......@@ -157,7 +169,11 @@ TEST_F(ParserShowToUseTest, showUsers) {
run("SHOW users");
}
// todo SHOW variables
TEST_F(ParserShowToUseTest, showVariables) {
useDb("root", "test");
run("SHOW VARIABLES");
}
TEST_F(ParserShowToUseTest, showVgroups) {
useDb("root", "test");
......
......@@ -1056,49 +1056,69 @@ static bool partTagsOptHasCol(SNodeList* pPartKeys) {
return hasCol;
}
static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
return ((QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) /*||
(QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != ((SAggLogicNode*)pNode)->pGroupKeys &&
NULL != ((SAggLogicNode*)pNode)->pAggFuncs)*/) &&
1 == LIST_LENGTH(pNode->pChildren) &&
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)));
}
static SNodeList* partTagsGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else {
return ((SAggLogicNode*)pNode)->pGroupKeys;
}
}
static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
if (!partTagsIsOptimizableNode(pNode)) {
return false;
}
return !partTagsOptHasCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
return !partTagsOptHasCol(partTagsGetPartKeys(pNode));
}
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SPartitionLogicNode* pPart =
(SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
if (NULL == pPart) {
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
if (NULL == pNode) {
return TSDB_CODE_SUCCESS;
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->node.pChildren, 0);
TSWAP(pPart->pPartitionKeys, pScan->pPartTags);
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pPart, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pPart->node.pChildren);
nodesDestroyNode((SNode*)pPart);
int32_t code = TSDB_CODE_SUCCESS;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pPartTags);
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pNode->pChildren);
nodesDestroyNode((SNode*)pNode);
}
} else {
TSWAP(((SAggLogicNode*)pNode)->pGroupKeys, pScan->pPartTags);
}
return code;
}
static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
//TODO: enable this optimization after new mechanising that map projection and targets of project node
// TODO: enable this optimization after new mechanising that map projection and targets of project node
if (NULL != pNode->pParent) {
return false;
}
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SProjectLogicNode* pProjectNode = (SProjectLogicNode*)pNode;
if (-1 != pProjectNode->limit || -1 != pProjectNode->slimit || -1 != pProjectNode->offset || -1 != pProjectNode->soffset) {
if (-1 != pProjectNode->limit || -1 != pProjectNode->slimit || -1 != pProjectNode->offset ||
-1 != pProjectNode->soffset) {
return false;
}
SHashObj* pProjColNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SNode* pProjection;
SNode* pProjection;
FOREACH(pProjection, pProjectNode->pProjections) {
SExprNode* pExprNode = (SExprNode*)pProjection;
if (QUERY_NODE_COLUMN != nodeType(pExprNode)) {
......@@ -1106,7 +1126,7 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
return false;
}
char* projColumnName = ((SColumnNode*)pProjection)->colName;
char* projColumnName = ((SColumnNode*)pProjection)->colName;
int32_t* pExist = taosHashGet(pProjColNameHash, projColumnName, strlen(projColumnName));
if (NULL != pExist) {
taosHashCleanup(pProjColNameHash);
......@@ -1121,9 +1141,10 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SProjectLogicNode* pProjectNode) {
static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan,
SProjectLogicNode* pProjectNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0);
SNodeList* pNewChildTargets = nodesMakeList();
SNodeList* pNewChildTargets = nodesMakeList();
SNode* pProjection = NULL;
FOREACH(pProjection, pProjectNode->pProjections) {
......@@ -1137,7 +1158,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
}
nodesDestroyList(pChild->pTargets);
pChild->pTargets = pNewChildTargets;
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
......@@ -1148,7 +1169,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
static int32_t eliminateProjOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SProjectLogicNode* pProjectNode =
(SProjectLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, eliminateProjOptMayBeOptimized);
(SProjectLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, eliminateProjOptMayBeOptimized);
if (NULL == pProjectNode) {
return TSDB_CODE_SUCCESS;
......
......@@ -534,7 +534,11 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
}
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &node);
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES)) {
pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
} else {
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
}
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
......@@ -879,12 +883,15 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
pInterpFunc->interval = pFuncLogicNode->interval;
pInterpFunc->fillMode = pFuncLogicNode->fillMode;
pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues);
pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries);
if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) {
if (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncLogicNode->pTimeSeries, &pInterpFunc->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pInterpFunc;
} else {
......
......@@ -45,14 +45,21 @@ TEST_F(PlanOptimizeTest, ConditionPushDown) {
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
useDb("root", "test");
run("SELECT * FROM t1 ORDER BY ts");
run("SELECT * FROM t1 ORDER BY ts DESC");
run("SELECT c1 FROM t1 ORDER BY ts");
run("SELECT c1 FROM t1 ORDER BY ts DESC");
run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTARTTS DESC");
}
TEST_F(PlanOptimizeTest, PartitionTags) {
useDb("root", "test");
run("SELECT c1 FROM st1 PARTITION BY tag1");
run("SELECT SUM(c1) FROM st1 GROUP BY tag1");
}
TEST_F(PlanOptimizeTest, eliminateProjection) {
useDb("root", "test");
......@@ -60,5 +67,5 @@ TEST_F(PlanOptimizeTest, eliminateProjection) {
run("SELECT c1 FROM t1");
run("SELECT * FROM st1");
run("SELECT c1 FROM st1s3");
//run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
}
......@@ -74,6 +74,8 @@ TEST_F(PlanOtherTest, show) {
run("SHOW TABLE DISTRIBUTED t1");
run("SHOW TABLE DISTRIBUTED st1");
run("SHOW DNODE 1 VARIABLES");
}
TEST_F(PlanOtherTest, delete) {
......
......@@ -35,8 +35,9 @@ bool syncEnvIsStart() {
}
int32_t syncEnvStart() {
int32_t ret = 0;
taosSeedRand(taosGetTimestampSec());
int32_t ret = 0;
uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
taosSeedRand(seed);
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv = doSyncEnvStart();
assert(gSyncEnv != NULL);
......
......@@ -2294,7 +2294,7 @@ static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyn
ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
}
// update changing
// clear changing
ths->changing = false;
char tmpbuf[512];
......@@ -2313,6 +2313,9 @@ static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyn
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry,
SyncReconfigFinish* pFinish) {
// set changing
ths->changing = true;
// old config
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
......
......@@ -81,6 +81,7 @@
# ./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/windowClose.sim
# ---- transaction
......
......@@ -22,7 +22,7 @@ sql create stream stream1 trigger window_close into streamt as select _wstartts
sql insert into tu1 values(now, 1);
sleep 300
sleep 500
sql select * from streamt;
if $rows != 0 then
print ======$rows
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册