diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index c7b831893ee806a94942be6b1691c9c3e4be898d..09116a6bd18999a293203955877b57f6018e635b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -45,8 +45,7 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms - -#define SYNC_ON_TOP_OF_ASYNC 0 +#define SYNC_ON_TOP_OF_ASYNC 0 enum { RES_TYPE__QUERY = 1, diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index abeec373d9c32c811ce5281f52cf8eaf67a36e20..a0f33627c4669424d6ead15d06fb1e3839411645 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -83,6 +83,35 @@ void queryCallback1(void* param, void* res, int32_t code) { printf("exec query:\n"); taos_query_a(param, "select * from tm1", queryCallback, param); } + +void createNewTable(TAOS* pConn, int32_t index) { + char str[1024] = {0}; + sprintf(str, "create table tu%d using st2 tags(%d)", index, index); + + TAOS_RES* pRes = taos_query(pConn, str); + if (taos_errno(pRes) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + for(int32_t i = 0; i < 1000; i += 20) { + char sql[1024] = {0}; + sprintf(sql, + "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", index, + i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7, + i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14, + i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } + + taos_free_result(p); + } +} } // namespace int main(int argc, char** argv) { @@ -590,7 +619,6 @@ TEST(testCase, generated_request_id_test) { taosHashCleanup(phash); } - TEST(testCase, insert_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -640,50 +668,10 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - pRes = taos_query(pConn, "create table tu2 using st2 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - for(int32_t i = 0; i < 1000; i += 20) { - char sql[1024] = {0}; - sprintf(sql, - "insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", - i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7, - i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14, - i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); - TAOS_RES* p = taos_query(pConn, sql); - if (taos_errno(p) != 0) { - printf("failed to insert data, reason:%s\n", taos_errstr(p)); - } - - taos_free_result(p); + for(int32_t i = 0; i < 100; ++i) { + printf("create table :%d\n", i); + createNewTable(pConn, i); } - - printf("start to insert next table\n"); - -// for(int32_t i = 0; i < 1000000; i += 20) { -// char sql[1024] = {0}; -// sprintf(sql, -// "insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" -// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" -// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" -// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", -// i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7, -// i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14, -// i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); -// TAOS_RES* p = taos_query(pConn, sql); -// if (taos_errno(p) != 0) { -// printf("failed to insert data, reason:%s\n", taos_errstr(p)); -// } -// -// taos_free_result(p); -// } - // pRes = taos_query(pConn, "select * from tu"); // if (taos_errno(pRes) != 0) { // printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); @@ -705,74 +693,74 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } -TEST(testCase, projection_query_stables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "select ts from st1"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, agg_query_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "show stables"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - int32_t n = 0; - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t* length = taos_fetch_lengths(pRes); - for(int32_t i = 0; i < numOfFields; ++i) { - printf("(%d):%d " , i, length[i]); - } - printf("\n"); - - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - memset(str, 0, sizeof(str)); - } +//TEST(testCase, projection_query_stables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select ts from st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, agg_query_tables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "show stables"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// int32_t n = 0; +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t* length = taos_fetch_lengths(pRes); +// for(int32_t i = 0; i < numOfFields; ++i) { +// printf("(%d):%d " , i, length[i]); +// } +// printf("\n"); +// +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// memset(str, 0, sizeof(str)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} #endif /* diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index a9e808e7490a652da5902c94fcdd24a2abbe5bc1..ce94eace660ba3dedb2f8f8bebb30bd2fe982351 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); + qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); return TSDB_CODE_SUCCESS; } @@ -143,7 +143,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); + qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); return TSDB_CODE_SUCCESS; } @@ -247,7 +247,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum; if (*taskNum <= 0) { - ctgError("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId); + ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f37a344cd18294ae81e4bc52ba43822ef9256d70..7d597217ee939d43259ca234ca99174e99629320 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -303,13 +303,17 @@ typedef struct SScanInfo { int32_t numOfDesc; } SScanInfo; +typedef struct SSampleExecInfo { + double sampleRatio; // data block sample ratio, 1 by default + uint32_t seed; // random seed value +} SSampleExecInfo; + typedef struct STableScanInfo { void* dataReader; SReadHandle readHandle; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; - int64_t elapsedTime; // int32_t prevGroupId; // previous table group id SScanInfo scanInfo; int32_t scanTimes; @@ -330,7 +334,6 @@ typedef struct STableScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. SArray* pGroupCols; @@ -339,6 +342,7 @@ typedef struct STableScanInfo { int32_t groupKeyLen; // total group by column width SHashObj* pGroupSet; // quick locate the window object for each result + SSampleExecInfo sample; // sample execution info int32_t curTWinIdx; } STableScanInfo; @@ -722,7 +726,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); -int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, +int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); @@ -757,7 +761,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); +SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); @@ -768,9 +776,6 @@ SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SArray* colList, - SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); @@ -788,9 +793,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, - SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, - STimeWindowAggSupp* pTwSup); +SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList, + STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, @@ -806,7 +810,6 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, @@ -824,8 +827,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); -void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); - bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); @@ -860,17 +861,15 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); -STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, - SInterval* pInterval, int32_t precision, STimeWindow* win); -int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, - int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, - int32_t order); +STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, + int32_t precision, STimeWindow* win); +int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, + __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey); int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); -SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, - int32_t* pIndex); +SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex); int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); @@ -878,6 +877,7 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor); bool isSmaStream(int8_t triggerType); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 1c45e38b632d29340472c1955d2b097377478ce0..484ff35e3a7193f8eb51c49eec38aff2eb391654 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -28,18 +28,6 @@ typedef struct SCompSupporter { int32_t order; } SCompSupporter; -int32_t getRowNumForMultioutput(STaskAttr* pQueryAttr, bool topBottomQuery, bool stable) { - if (pQueryAttr && (!stable)) { - for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) { -// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) { -// return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i; -// } - } - } - - return 1; -} - int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { int32_t size = 0; @@ -113,35 +101,6 @@ void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } -void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) { - if (pResultRow == NULL) { - return; - } - - // the result does not put into the SDiskbasedBuf, ignore it. - if (pResultRow->pageId >= 0) { - SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); - - int16_t offset = 0; - for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { - struct SResultRowEntryInfo *pEntryInfo = NULL;//pResultRow->pEntryInfo[i]; - -// int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes; -// char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset); -// memset(s, 0, size); - -// offset += size; - cleanupResultRowEntry(pEntryInfo); - } - } - - pResultRow->numOfRows = 0; - pResultRow->pageId = -1; - pResultRow->offset = -1; - pResultRow->closed = false; - pResultRow->win = TSWINDOW_INITIALIZER; -} - // TODO refactor: use macro SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset) { assert(index >= 0 && offset != NULL); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 07f4251340b907193e510d7fd279e31011ce1d64..a10820bfe326138f90f31dfa8a1e6286696f8763 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2415,6 +2415,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) pRsp->compLen = htonl(pRsp->compLen); pRsp->numOfCols = htonl(pRsp->numOfCols); pRsp->useconds = htobe64(pRsp->useconds); + + ASSERT(pSourceDataInfo->pRsp != NULL); + qDebug("fetch rsp received, index:%d, rows:%d", pSourceDataInfo->index, pRsp->numOfRows); } else { pSourceDataInfo->code = code; } @@ -2463,6 +2466,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); + ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); @@ -2516,7 +2521,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray } } -int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, +int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { if (pColList == NULL) { // data from other sources @@ -2640,7 +2645,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx int32_t completed = 0; for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); - if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; continue; @@ -2650,6 +2654,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx continue; } + if (pDataInfo->code != TSDB_CODE_SUCCESS) { + code = pDataInfo->code; + goto _error; + } + SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); @@ -2667,7 +2676,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; - code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, + code = setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (code != 0) { taosMemoryFreeClear(pDataInfo->pRsp); @@ -2687,6 +2696,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx pLoadInfo->totalSize); } + taosMemoryFreeClear(pDataInfo->pRsp); + if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); @@ -2696,7 +2707,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } } - taosMemoryFreeClear(pDataInfo->pRsp); return pExchangeInfo->pResult; } @@ -2710,34 +2720,6 @@ _error: return NULL; } -static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) { - SExchangeInfo* pExchangeInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - if (pOperator->status == OP_RES_TO_RETURN) { - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); - } - - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - int64_t startTs = taosGetTimestampUs(); - - // Asynchronously send all fetch requests to all sources. - for (int32_t i = 0; i < totalSources; ++i) { - int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - } - - int64_t endTs = taosGetTimestampUs(); - qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), - totalSources, endTs - startTs); - - tsem_wait(&pExchangeInfo->ready); - pOperator->status = OP_RES_TO_RETURN; - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); -} - static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2755,10 +2737,11 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { } int64_t endTs = taosGetTimestampUs(); - qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), + qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); tsem_wait(&pExchangeInfo->ready); + pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; return TSDB_CODE_SUCCESS; @@ -2806,7 +2789,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { SSDataBlock* pRes = pExchangeInfo->pResult; SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; int32_t code = - setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, + setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (pRsp->completed == 1) { @@ -2872,7 +2855,8 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { if (pExchangeInfo->seqLoadData) { return seqLoadRemoteData(pOperator); } else { - return concurrentlyLoadRemoteData(pOperator); + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); +// return concurrentlyLoadRemoteData(pOperator); } } @@ -2898,52 +2882,55 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* pSources, SSDataBlock* pBlock, - SExecTaskInfo* pTaskInfo) { - SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - goto _error; - } +static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo) { + size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints); - size_t numOfSources = LIST_LENGTH(pSources); pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { - goto _error; + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < numOfSources; ++i) { - SNodeListNode* pNode = nodesListGetNode((SNodeList*)pSources, i); + SNodeListNode* pNode = nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i); taosArrayPush(pInfo->pSources, pNode); } - int32_t code = initDataSource(numOfSources, pInfo); - if (code != TSDB_CODE_SUCCESS) { + return initDataSource(numOfSources, pInfo); +} + +SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) { + SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pResult = pBlock; - pInfo->seqLoadData = true; + int32_t code = initExchangeOperator(pExNode, pInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->seqLoadData = false; + pInfo->pTransporter = pTransporter; + pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); tsem_init(&pInfo->ready, 0, 0); - pOperator->name = "ExchangeOperator"; + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pBlock->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pInfo->pResult->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); - pInfo->pTransporter = pTransporter; return pOperator; _error: if (pInfo != NULL) { - destroyExchangeOperatorInfo(pInfo, numOfSources); + destroyExchangeOperatorInfo(pInfo, LIST_LENGTH(pExNode->pSrcEndPoints)); } taosMemoryFreeClear(pInfo); @@ -4397,9 +4384,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { - SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); - return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); + return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; @@ -4427,41 +4412,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; - SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan; - - SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; - - SSDataBlock* pResBlock = createResDataBlock(pDescNode); - - int32_t numOfOutputCols = 0; - SArray* colList = - extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID); - SOperatorInfo* pOperator = createSysTableScanOperatorInfo( - pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, - pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); - return pOperator; + return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - - SSDataBlock* pResBlock = createResDataBlock(pDescNode); - int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { return NULL; } - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num); - - int32_t numOfOutputCols = 0; - SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, - COL_MATCH_FROM_COL_ID); - - SOperatorInfo* pOperator = - createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableListInfo, pTaskInfo); - return pOperator; + return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); } else { ASSERT(0); } @@ -5022,45 +4982,6 @@ _complete: return code; } -void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) { - const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); - - // the minimum number of rows for projection query - const int32_t MIN_ROWS_FOR_PRJ_QUERY = 8192; - const int32_t DEFAULT_MIN_ROWS = 4096; - - const float THRESHOLD_RATIO = 0.85f; - - // if (isProjQuery(pQueryAttr)) { - // int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize; - // if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) { - // numOfRes = MIN_ROWS_FOR_PRJ_QUERY; - // } - // - // pResultInfo->capacity = numOfRes; - // } else { // in case of non-prj query, a smaller output buffer will be used. - // pResultInfo->capacity = DEFAULT_MIN_ROWS; - // } - - pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO); - pResultInfo->totalRows = 0; -} - -// TODO refactor -void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { - if (pFilter == NULL || numOfFilters == 0) { - return; - } - - for (int32_t i = 0; i < numOfFilters; i++) { - if (pFilter[i].filterstr && pFilter[i].pz) { - taosMemoryFree((void*)(pFilter[i].pz)); - } - } - - taosMemoryFree(pFilter); -} - static void doDestroyTableList(STableListInfo* pTableqinfoList) { taosArrayDestroy(pTableqinfoList->pTableList); taosHashCleanup(pTableqinfoList->map); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 007ad63bddd6691fc14c43923426c9775ccfd3f8..ba0397fd3439e252c3de115459205dd534c7f78a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -38,15 +38,26 @@ #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) -typedef struct SWindowPosition { - int32_t pageId; - int32_t rowId; -} SWindowPosition; - static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); +static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock); +static bool processBlockWithProbability(const SSampleExecInfo *pInfo); + +bool processBlockWithProbability(const SSampleExecInfo *pInfo) { +#if 0 + if (pInfo->sampleRatio == 1) { + return true; + } + + uint32_t val = taosRandR((uint32_t*) &pInfo->seed); + return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0; +#else + return true; +#endif +} + static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { SWITCH_ORDER(pCtx[i].order); @@ -160,8 +171,6 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } -static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock); - static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -281,12 +290,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i]; TSWAP(pTWindow->skey, pTWindow->ekey); } - SQueryTableDataCond *pCond = &pTableScanInfo->cond; - taosqsort(pCond->twindows, - pCond->numOfTWindows, - sizeof(STimeWindow), - pCond, - compareTimeWindow); + + SQueryTableDataCond* pCond = &pTableScanInfo->cond; + taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow); } void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) { @@ -317,18 +323,19 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_ } else { // these are tags STagVal tagVal = {0}; tagVal.cid = pExpr->base.pParam[0].pCol->colId; - const char *p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal); + const char* p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal); - char *data = NULL; - if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL){ - data = tTagValToData((const STagVal *)p, false); - }else { + char* data = NULL; + if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { + data = tTagValToData((const STagVal*)p, false); + } else { data = (char*)p; } for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, data, (data == NULL)); } + if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { taosMemoryFree(data); @@ -366,6 +373,12 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); uint32_t status = 0; @@ -451,6 +464,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); } + while (pTableScanInfo->scanTimes < total) { while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) { SSDataBlock* p = doTableScanImpl(pOperator); @@ -550,24 +564,26 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose - pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); - pInfo->sampleRatio = pTableScanNode->ratio; + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->sample.sampleRatio= pTableScanNode->ratio; + pInfo->sample.seed = taosGetTimestampSec(); + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pResBlock = createResDataBlock(pDescNode); - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; - pInfo->dataReader = pDataReader; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; - pInfo->curTWinIdx = 0; - - pOperator->name = "TableScanOperator"; // for debug purpose + pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; + + pOperator->name = "TableScanOperator"; // for debug purpose pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; // for table group pInfo->pGroupCols = groupKyes; @@ -586,13 +602,13 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; - return pOperator; -_error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); + + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; } @@ -706,12 +722,12 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); } -static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { - return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW; +static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { + return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW; } -static bool isStateWindow(SStreamBlockScanInfo* pInfo) { - return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW; +static bool isStateWindow(SStreamBlockScanInfo* pInfo) { + return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW; } static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { @@ -843,7 +859,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } return pInfo->pUpdateRes; } else { - if (isStateWindow(pInfo) && + if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; @@ -1429,7 +1445,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; - setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, + setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info @@ -1503,48 +1519,59 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT return numOfRows; } -SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SArray* colList, - SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) { +SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo) { SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } - pInfo->accountId = accountId; - pInfo->showRewrite = showRewrite; - pInfo->pRes = pResBlock; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; + SScanPhysiNode* pScanNode = &pScanPhyNode->scan; + + SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; + SSDataBlock* pResBlock = createResDataBlock(pDescNode); + + int32_t num = 0; + SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID); + + pInfo->accountId = pScanPhyNode->accountId; + pInfo->showRewrite = pScanPhyNode->showRewrite; + pInfo->pRes = pResBlock; + pInfo->pCondition = pScanNode->node.pConditions; + pInfo->scanCols = colList; initResultSizeInfo(pOperator, 4096); - tNameAssign(&pInfo->name, pName); + tNameAssign(&pInfo->name, &pScanNode->tableName); const char* name = tNameGetTableName(&pInfo->name); + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { pInfo->readHandle = *(SReadHandle*)readHandle; blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); } else { tsem_init(&pInfo->ready, 0, 0); - pInfo->epSet = epset; + pInfo->epSet = pScanPhyNode->mgmtEpSet; pInfo->readHandle = *(SReadHandle*)readHandle; } - pOperator->name = "SysTableScanOperator"; + pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); - pOperator->pTaskInfo = pTaskInfo; return pOperator; + + _error: + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; } static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { @@ -1699,28 +1726,34 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, - SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableListInfo, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = pColMatchInfo; - pInfo->pRes = pResBlock; - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; - pOperator->name = "TagScanOperator"; + SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; + + int32_t numOfExprs = 0; + SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); + + int32_t num = 0; + SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID); + + pInfo->pTableList = pTableListInfo; + pInfo->pColMatchInfo = colList; + pInfo->pRes = createResDataBlock(pDescNode);; + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; + pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExpr; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfExprs; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 8f5fa88070fde1625385fd6e691ccccf424c2094..5e1b47cc9115bd10231ceaa854d4453e5b9223df 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -1,3 +1,18 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + #include "tdatablock.h" #include "executorimpl.h" diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 8ec20cde5a07b54a5609a6097316e4ae5e538a83..3efe6700d2339b2234d33f868c0b42fa993d1b64 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -32,7 +32,6 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery); int32_t translate(SParseContext* pParseCxt, SQuery* pQuery); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); -int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 40f421e6545ca0e26e29efc0b5e64b3ce04ea6ab..6c44ce4c3c47660705724fe67e682140977fbb3c 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -180,11 +180,11 @@ static int32_t collectMetaKeyFromSelect(SCollectMetaKeyCxt* pCxt, SSelectStmt* p } static int32_t collectMetaKeyFromCreateTable(SCollectMetaKeyCxt* pCxt, SCreateTableStmt* pStmt) { - if (NULL == pStmt->pTags) { - return reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); - } else { - return reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); + int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTags) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); } + return code; } static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCreateMultiTableStmt* pStmt) { @@ -192,8 +192,11 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre SNode* pNode = NULL; FOREACH(pNode, pStmt->pSubTables) { SCreateSubTableClause* pClause = (SCreateSubTableClause*)pNode; - code = - reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->useDbName, pClause->useTableName, pCxt->pMetaCache); + code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pClause->dbName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = + reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->useDbName, pClause->useTableName, pCxt->pMetaCache); + } if (TSDB_CODE_SUCCESS == code) { code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 422c48039743185070ca5ea5f704627d26217253..efe0bd75b2fe66efc0d84932e877f873c4ae0319 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -254,7 +254,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) { SParseContext* pBasicCtx = pCxt->pComCxt; - if (NULL != pCxt->pMetaCache) { + if (pBasicCtx->async) { return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass); } return catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, pDbFname, @@ -263,7 +263,7 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) { SParseContext* pBasicCtx = pCxt->pComCxt; - if (NULL != pCxt->pMetaCache) { + if (pBasicCtx->async) { return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta); } if (isStb) { @@ -275,7 +275,7 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) { SParseContext* pBasicCtx = pCxt->pComCxt; - if (NULL != pCxt->pMetaCache) { + if (pBasicCtx->async) { return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg); } return catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pVg); @@ -305,6 +305,16 @@ static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFna return getTableMetaImpl(pCxt, name, dbFname, true); } +static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) { + SParseContext* pBasicCtx = pCxt->pComCxt; + if (pBasicCtx->async) { + CHECK_CODE(getDbCfgFromCache(pCxt->pMetaCache, pDbFName, pInfo)); + } else { + CHECK_CODE(catalogGetDBCfg(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pDbFName, pInfo)); + } + return TSDB_CODE_SUCCESS; +} + static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { while (start < end) { if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) { @@ -1090,10 +1100,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb SName sname; createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); - char stbFName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(&sname, stbFName); + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(&sname, dbFName); - CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName)); + CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName)); if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } @@ -1282,6 +1292,14 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { destroyBlockArrayList(pCxt->pVgDataBlocks); } +static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) { + SDbCfgInfo pInfo = {0}; + char fullName[TSDB_TABLE_FNAME_LEN]; + snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); + CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); + return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; +} + // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] // [(field1_name, ...)] @@ -1335,7 +1353,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { SName name; CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(isNotSchemalessDb(pCxt->pComCxt, name.dbname)); + CHECK_CODE(checkSchemalessDb(pCxt, name.dbname)); tNameExtractFullName(&name, tbFName); CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName))); @@ -1413,23 +1431,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return buildOutput(pCxt); } -int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName){ - SName name; - tNameSetDbName(&name, pContext->acctId, dbName, strlen(dbName)); - char dbFname[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(&name, dbFname); - SDbCfgInfo pInfo = {0}; - int32_t code = catalogGetDBCfg(pContext->pCatalog, pContext->pTransporter, &pContext->mgmtEpSet, dbFname, &pInfo); - if (code != TSDB_CODE_SUCCESS) { - parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname); - return code; - } - if (pInfo.schemaless){ - parserError("can not insert into schemaless db:%s", dbFname); - return TSDB_CODE_SML_INVALID_DB_CONF; - } - return TSDB_CODE_SUCCESS; -} // INSERT INTO // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] @@ -1472,6 +1473,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (NULL == *pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } + } else { + context.pMetaCache = (*pQuery)->pMetaCache; } (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; (*pQuery)->haveResultSet = false; @@ -1586,12 +1589,20 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { SName name; CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); + CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache)); CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); return TSDB_CODE_SUCCESS; } +static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { + SName name; + CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); + CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); + return TSDB_CODE_SUCCESS; +} + static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) { bool hasData = false; // for each table @@ -1620,6 +1631,7 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) { // USING clause if (TK_USING == sToken.type) { + CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken)); NEXT_TOKEN(pCxt->pSql, sToken); CHECK_CODE(collectTableMetaKey(pCxt, &sToken)); CHECK_CODE(skipUsingClause(pCxt)); @@ -2172,8 +2184,8 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p val.nData = kv->length; } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { int32_t output = 0; - void *p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE); - if(p == NULL){ + void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE); + if (p == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 27b57bd9853279ad8f71839f725d65431be077bd..35d3a51a2dc276a707122d4176b5644f9ae18184 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2653,8 +2653,23 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt return code; } +static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) { + if (0 != pCxt->pParseCxt->schemalessType) { + return TSDB_CODE_SUCCESS; + } + SDbCfgInfo info = {0}; + int32_t code = getDBCfg(pCxt, pDbName, &info); + if (TSDB_CODE_SUCCESS == code) { + code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; + } + return code; +} + static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { - int32_t code = checTableFactorOption(pCxt, pStmt->pOptions->filesFactor); + int32_t code = checkSchemalessDb(pCxt, pStmt->dbName); + if (TSDB_CODE_SUCCESS == code) { + code = checTableFactorOption(pCxt, pStmt->pOptions->filesFactor); + } if (TSDB_CODE_SUCCESS == code) { code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs); } @@ -2667,11 +2682,6 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt if (TSDB_CODE_SUCCESS == code) { code = checkTableSchema(pCxt, pStmt); } - if (TSDB_CODE_SUCCESS == code) { - if (pCxt->pParseCxt->schemalessType == 0) { - code = isNotSchemalessDb(pCxt->pParseCxt, pStmt->dbName); - } - } return code; } @@ -4445,11 +4455,11 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) int32_t code = TSDB_CODE_SUCCESS; SNode* pNode; FOREACH(pNode, pStmt->pSubTables) { - if (pCxt->pParseCxt->schemalessType == 0 && - (code = isNotSchemalessDb(pCxt->pParseCxt, ((SCreateSubTableClause*)pNode)->dbName)) != TSDB_CODE_SUCCESS) { - return code; + SCreateSubTableClause* pClause = (SCreateSubTableClause*)pNode; + code = checkSchemalessDb(pCxt, pClause->dbName); + if (TSDB_CODE_SUCCESS == code) { + code = rewriteCreateSubTable(pCxt, pClause, pVgroupHashmap); } - code = rewriteCreateSubTable(pCxt, (SCreateSubTableClause*)pNode, pVgroupHashmap); if (TSDB_CODE_SUCCESS != code) { taosHashCleanup(pVgroupHashmap); return code; @@ -4860,13 +4870,8 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot; - int32_t code = TSDB_CODE_SUCCESS; - if (pCxt->pParseCxt->schemalessType == 0 && - (code = isNotSchemalessDb(pCxt->pParseCxt, pStmt->dbName)) != TSDB_CODE_SUCCESS) { - return code; - } - - STableMeta* pTableMeta = NULL; + int32_t code = checkSchemalessDb(pCxt, pStmt->dbName); + STableMeta* pTableMeta = NULL; code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); if (TSDB_CODE_SUCCESS != code) { return code; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 474cbef0bc633bd08de7e7a6c70e51b14909ad9a..f30cc1111f15d5d238e4c9cc724c0325a30af579 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -681,6 +681,7 @@ int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, S tNameExtractFullName(pName, fullName); STableMeta** pRes = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName)); if (NULL == pRes || NULL == *pRes) { + parserError("getTableMetaFromCache error: %s", fullName); return TSDB_CODE_PAR_INTERNAL_ERROR; } *pMeta = tableMetaDup(*pRes); @@ -709,6 +710,7 @@ int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache* int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo) { SArray** pRes = taosHashGet(pMetaCache->pDbVgroup, pDbFName, strlen(pDbFName)); if (NULL == pRes) { + parserError("getDbVgInfoFromCache error: %s", pDbFName); return TSDB_CODE_PAR_INTERNAL_ERROR; } // *pRes is null, which is a legal value, indicating that the user DB has not been created @@ -736,6 +738,7 @@ int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, tNameExtractFullName(pName, fullName); SVgroupInfo** pRes = taosHashGet(pMetaCache->pTableVgroup, fullName, strlen(fullName)); if (NULL == pRes || NULL == *pRes) { + parserError("getTableVgroupFromCache error: %s", fullName); return TSDB_CODE_PAR_INTERNAL_ERROR; } memcpy(pVgroup, *pRes, sizeof(SVgroupInfo)); @@ -750,6 +753,7 @@ int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFNam int32_t* pTableNum) { SDbInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName)); if (NULL == pRes || NULL == *pRes) { + parserError("getDbVgVersionFromCache error: %s", pDbFName); return TSDB_CODE_PAR_INTERNAL_ERROR; } *pVersion = (*pRes)->vgVer; @@ -765,6 +769,7 @@ int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pM int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo) { SDbCfgInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName)); if (NULL == pRes || NULL == *pRes) { + parserError("getDbCfgFromCache error: %s", pDbFName); return TSDB_CODE_PAR_INTERNAL_ERROR; } memcpy(pInfo, *pRes, sizeof(SDbCfgInfo)); @@ -803,6 +808,7 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con int32_t len = userAuthToStringExt(pUser, pDbFName, type, key); bool* pRes = taosHashGet(pMetaCache->pUserAuth, key, len); if (NULL == pRes) { + parserError("getUserAuthFromCache error: %s, %s, %d", pUser, pDbFName, type); return TSDB_CODE_PAR_INTERNAL_ERROR; } *pPass = *pRes; @@ -822,6 +828,7 @@ int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) { int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo) { SFuncInfo** pRes = taosHashGet(pMetaCache->pUdf, pFunc, strlen(pFunc)); if (NULL == pRes || NULL == *pRes) { + parserError("getUdfInfoFromCache error: %s", pFunc); return TSDB_CODE_PAR_INTERNAL_ERROR; } memcpy(pInfo, *pRes, sizeof(SFuncInfo)); diff --git a/source/libs/parser/test/parInsertTest.cpp b/source/libs/parser/test/parInsertTest.cpp index 4d313fca766e8ab8f8d6ba404f7faf2fe833e9e6..eae83e222385f8e7a7af1be56bb3647b3e806358 100644 --- a/source/libs/parser/test/parInsertTest.cpp +++ b/source/libs/parser/test/parInsertTest.cpp @@ -59,6 +59,7 @@ class InsertTest : public Test { } int32_t runAsync() { + cxt_.async = true; code_ = parseInsertSyntax(&cxt_, &res_); if (code_ != TSDB_CODE_SUCCESS) { cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl; diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index b2ca1e03456c6414459a72999180e9d57ca104c9..c73efde9d5076e21038741c95f3edf470d288f9e 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -3,8 +3,8 @@ #run tsim/user/privilege2.sim #run tsim/user/user_len.sim #run tsim/user/privilege1.sim -##run tsim/user/pass_len.sim -##run tsim/table/basic1.sim +#run tsim/user/pass_len.sim +#run tsim/table/basic1.sim #run tsim/trans/lossdata1.sim #run tsim/trans/create_db.sim ##run tsim/stable/alter_metrics.sim @@ -13,20 +13,20 @@ ##run tsim/stable/column_drop.sim ##run tsim/stable/column_modify.sim ##run tsim/stable/tag_rename.sim -##run tsim/stable/vnode3.sim -##run tsim/stable/metrics.sim +#run tsim/stable/vnode3.sim +#run tsim/stable/metrics.sim ##run tsim/stable/alter_insert2.sim -##run tsim/stable/show.sim -##run tsim/stable/alter_import.sim +#run tsim/stable/show.sim +#run tsim/stable/alter_import.sim ##run tsim/stable/tag_add.sim -##run tsim/stable/tag_drop.sim -##run tsim/stable/column_add.sim -##run tsim/stable/alter_count.sim -##run tsim/stable/values.sim -##run tsim/stable/dnode3.sim -##run tsim/stable/alter_insert1.sim -##run tsim/stable/refcount.sim -##run tsim/stable/disk.sim +#run tsim/stable/tag_drop.sim +#run tsim/stable/column_add.sim +#run tsim/stable/alter_count.sim +#run tsim/stable/values.sim +#run tsim/stable/dnode3.sim +#run tsim/stable/alter_insert1.sim +#run tsim/stable/refcount.sim +#run tsim/stable/disk.sim #run tsim/db/basic1.sim #run tsim/db/basic3.sim ##run tsim/db/basic7.sim @@ -40,24 +40,24 @@ #run tsim/mnode/basic3.sim #run tsim/mnode/basic2.sim #run tsim/parser/fourArithmetic-basic.sim -run tsim/parser/groupby-basic.sim -run tsim/snode/basic1.sim -run tsim/query/time_process.sim -run tsim/query/stddev.sim -run tsim/query/interval-offset.sim -run tsim/query/charScalarFunction.sim -run tsim/query/complex_select.sim -run tsim/query/explain.sim -run tsim/query/crash_sql.sim -run tsim/query/diff.sim -run tsim/query/complex_limit.sim -run tsim/query/complex_having.sim -run tsim/query/udf.sim -run tsim/query/complex_group.sim -run tsim/query/interval.sim -run tsim/query/session.sim -run tsim/query/scalarFunction.sim -run tsim/query/scalarNull.sim +#run tsim/parser/groupby-basic.sim +#run tsim/snode/basic1.sim +#run tsim/query/time_process.sim +#run tsim/query/stddev.sim +#run tsim/query/interval-offset.sim +#run tsim/query/charScalarFunction.sim +#run tsim/query/complex_select.sim +#run tsim/query/explain.sim +#run tsim/query/crash_sql.sim +#run tsim/query/diff.sim +#run tsim/query/complex_limit.sim +#run tsim/query/complex_having.sim +#run tsim/query/udf.sim +#run tsim/query/complex_group.sim +#run tsim/query/interval.sim +#run tsim/query/session.sim +#run tsim/query/scalarFunction.sim +##run tsim/query/scalarNull.sim run tsim/query/complex_where.sim run tsim/tmq/basic1.sim run tsim/tmq/basic4.sim