diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 8332e9d09a777c16d841d84e4890ad2665037e2a..45f608b24e43be409377be7c0e9f6494f2ea6e63 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -193,20 +193,19 @@ typedef struct SColumn { uint8_t scale; } SColumn; -typedef struct SLimit { - int64_t limit; - int64_t offset; -} SLimit; - -typedef struct SOrder { - uint32_t order; - SColumn col; -} SOrder; - -typedef struct SGroupbyExpr { - SArray* columnInfo; // SArray, group by columns information - bool groupbyTag; // group by tag or column -} SGroupbyExpr; +typedef struct STableBlockDistInfo { + uint16_t rowSize; + uint16_t numOfFiles; + uint32_t numOfTables; + uint64_t totalSize; + uint64_t totalRows; + int32_t maxRows; + int32_t minRows; + int32_t firstSeekTimeUs; + uint32_t numOfRowsInMemTable; + uint32_t numOfSmallBlocks; + SArray *dataBlockInfos; +} STableBlockDistInfo; enum { FUNC_PARAM_TYPE_VALUE = 0x1, @@ -241,15 +240,6 @@ typedef struct SExprInfo { struct tExprNode* pExpr; } SExprInfo; -typedef struct SStateWindow { - SColumn col; -} SStateWindow; - -typedef struct SSessionWindow { - int64_t gap; // gap between two session window(in microseconds) - SColumn col; -} SSessionWindow; - #define QUERY_ASC_FORWARD_STEP 1 #define QUERY_DESC_FORWARD_STEP -1 diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 9a518589b0dc597f566906200ced19c2d1ed7884..fe9edc323d20e636c431368a1b95a7f78990e1a4 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -52,7 +52,12 @@ typedef struct SFuncExecFuncs { FExecFinalize finalize; } SFuncExecFuncs; -#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results +typedef struct SFileBlockInfo { + int32_t numBlocksOfStep; +} SFileBlockInfo; + +#define TSDB_BLOCK_DIST_STEP_ROWS 8 +#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results #define FUNCTION_TYPE_SCALAR 1 #define FUNCTION_TYPE_AGG 2 @@ -101,10 +106,6 @@ typedef struct SFuncExecFuncs { #define FUNCTION_DERIVATIVE 32 #define FUNCTION_BLKINFO 33 -#define FUNCTION_HISTOGRAM 34 -#define FUNCTION_HLL 35 -#define FUNCTION_MODE 36 -#define FUNCTION_SAMPLE 37 #define FUNCTION_COV 38 diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index dec709535f9042e0d6347197386407250696c7fc..ca4646a5ca13e2eef0c5d7cbd800a3e78c60f184 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -277,7 +277,6 @@ typedef struct SVnodeModifOpStmt { ENodeType nodeType; ENodeType sqlNodeType; SArray* pDataBlocks; // data block for each vgroup, SArray. - int8_t schemaAttache; // denote if submit block is built with table schema or not uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position diff --git a/include/util/tdef.h b/include/util/tdef.h index b1bebcee468587d22f82008004d34c98f07cbb68..be88afcf664b42c1ed74528af34560ccc76effb1 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -307,7 +307,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_TOTAL_BLOCKS 10000 #define TSDB_DEFAULT_TOTAL_BLOCKS 6 -#define TSDB_MIN_DAYS_PER_FILE (1 * 1440) // unit minute +#define TSDB_MIN_DAYS_PER_FILE 60 // unit minute #define TSDB_MAX_DAYS_PER_FILE (3650 * 1440) #define TSDB_DEFAULT_DAYS_PER_FILE (10 * 1440) diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index 255a739addd71aeb8db9731d985437e144772829..f3a409f2575d40ab51cfacfa432a5c0afe153540 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -134,33 +134,42 @@ typedef struct SDnode { SMgmtWrapper wrappers[NODE_MAX]; } SDnode; +// dndFile.h +int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); +int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); + +// dndInt.h +EDndStatus dndGetStatus(SDnode *pDnode); +void dndSetStatus(SDnode *pDnode, EDndStatus stat); +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); +int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); +void dndReleaseWrapper(SMgmtWrapper *pWrapper); + +// dndMonitor.h +void dndSendMonitorReport(SDnode *pDnode); + +// dndMsg.h +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); +int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); + +// dndStr.h +const char *dndStatStr(EDndStatus stat); const char *dndNodeLogStr(ENodeType ntype); const char *dndNodeProcStr(ENodeType ntype); const char *dndEventStr(EDndEvent ev); -EDndStatus dndGetStatus(SDnode *pDnode); -void dndSetStatus(SDnode *pDnode, EDndStatus stat); -void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); -void dndSendMonitorReport(SDnode *pDnode); +// dndTransport.h int32_t dndInitServer(SDnode *pDnode); void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); -int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); -int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); -int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); - -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); -int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); -void dndReleaseWrapper(SMgmtWrapper *pWrapper); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/main/inc/dndInt.h b/source/dnode/mgmt/main/inc/dndInt.h index 612d35d513ec8b78bdecd91602225c3c69cbdcfa..0c4ada1df385ba5872c44f31c6f98263197bb3ef 100644 --- a/source/dnode/mgmt/main/inc/dndInt.h +++ b/source/dnode/mgmt/main/inc/dndInt.h @@ -29,26 +29,24 @@ extern "C" { #endif -// dndInt.c -int32_t dndInit(); -void dndCleanup(); -const char *dndStatStr(EDndStatus stat); -void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); -void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); - -// dndMsg.c -void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); +// dndEnv.h +int32_t dndInit(); +void dndCleanup(); -// dndExec.c +// dndExec.h int32_t dndOpenNode(SMgmtWrapper *pWrapper); void dndCloseNode(SMgmtWrapper *pWrapper); int32_t dndRun(SDnode *pDnode); -// dndObj.c +// dndInt.c SDnode *dndCreate(const SDnodeOpt *pOption); void dndClose(SDnode *pDnode); void dndHandleEvent(SDnode *pDnode, EDndEvent event); +// dndMsg.c +void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); + // dndTransport.c int32_t dndInitMsgHandle(SDnode *pDnode); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); diff --git a/source/dnode/mgmt/main/src/dndEnv.c b/source/dnode/mgmt/main/src/dndEnv.c index 879214782289449015a2c7dcc01ed4c9a1c2afbd..9f75594335021553f2dd03b511c8f925c2238286 100644 --- a/source/dnode/mgmt/main/src/dndEnv.c +++ b/source/dnode/mgmt/main/src/dndEnv.c @@ -57,40 +57,3 @@ void dndCleanup() { taosStopCacheRefreshWorker(); dInfo("dnode env is cleaned up"); } - -void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { - pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; - pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; -} - -EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } - -void dndSetStatus(SDnode *pDnode, EDndStatus status) { - if (pDnode->status != status) { - dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status)); - pDnode->status = status; - } -} - -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { - SStartupReq *pStartup = &pDnode->startup; - tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); - tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); - pStartup->finished = 0; -} - -void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { - memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); - pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); -} - -void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("startup req is received"); - SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); - dndGetStartup(pDnode, pStartup); - - dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); - SRpcMsg rpcRsp = { - .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle}; - rpcSendResponse(&rpcRsp); -} diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 6f6e21b983d0b41f4e8765b996a6ffd4dcf9f5bf..25e4d89c5b05753ec649ad1d3082c55a2cc461d3 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -188,4 +188,18 @@ void dndReleaseWrapper(SMgmtWrapper *pWrapper) { int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); taosRUnLockLatch(&pWrapper->latch); dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); -} \ No newline at end of file +} + +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { + pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; + pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; +} + +EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } + +void dndSetStatus(SDnode *pDnode, EDndStatus status) { + if (pDnode->status != status) { + dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status)); + pDnode->status = status; + } +} diff --git a/source/dnode/mgmt/main/src/dndMsg.c b/source/dnode/mgmt/main/src/dndMsg.c index 2ab1e401c7385c377a410608c40736a898406f39..bb0f8ccb895e0e2e0bd30c1d6ba748b86d1d885c 100644 --- a/source/dnode/mgmt/main/src/dndMsg.c +++ b/source/dnode/mgmt/main/src/dndMsg.c @@ -66,8 +66,8 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); code = (*msgFp)(pWrapper, pMsg); } else if (pWrapper->procType == PROC_PARENT) { - dTrace("msg:%p, is created and put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, - pRpc->ahandle, pMsg->user); + dTrace("msg:%p, is created and put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, + pMsg->user); code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ); } else { dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, @@ -171,4 +171,27 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; return -1; } -} \ No newline at end of file +} + +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { + SStartupReq *pStartup = &pDnode->startup; + tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); + tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); + pStartup->finished = 0; +} + +void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { + memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); + pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); +} + +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { + dDebug("startup req is received"); + SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); + dndGetStartup(pDnode, pStartup); + + dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); + SRpcMsg rpcRsp = { + .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle}; + rpcSendResponse(&rpcRsp); +} diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index b287a1fe4c12a0974596047d52dd082d243d18de..3cabb6ce2e6b4dd0f02fd222869df111f1beb4d2 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -172,6 +172,8 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); +int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo); + bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle); /** diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7f2605ae2c70b3982c405e91ac7e0fc968acbc7e..4a3e764b13c327ed94f323b229f997cc87fd1ac5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -14,7 +14,7 @@ */ #include "tsdbDef.h" -#include +#include "tdatablock.h" #include "os.h" #include "talgo.h" #include "tcompare.h" @@ -31,6 +31,7 @@ #include "tlosertree.h" #include "tsdbDef.h" #include "tmsg.h" +#include "tsdbCommit.h" #define EXTRA_BYTES 2 #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -209,34 +210,34 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load return pLocalIdList; } -//int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { -// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; -// -// int64_t rows = 0; -// STsdbMemTable* pMemTable = pTsdbReadHandle->pMemTable; -// if (pMemTable == NULL) { return rows; } -// -//// STableData* pMem = NULL; -//// STableData* pIMem = NULL; -// -//// SMemTable* pMemT = pMemRef->snapshot.mem; -//// SMemTable* pIMemT = pMemRef->snapshot.imem; -// -// size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); -// for (int32_t i = 0; i < size; ++i) { -// STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); -// -//// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { -//// pMem = pMemT->tData[pCheckInfo->tableId]; -//// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; -//// } -//// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { -//// pIMem = pIMemT->tData[pCheckInfo->tableId]; -//// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; -//// } -// } -// return rows; -//} +int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { + STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; + + int64_t rows = 0; + STsdbMemTable* pMemTable = NULL;//pTsdbReadHandle->pMemTable; + if (pMemTable == NULL) { return rows; } + +// STableData* pMem = NULL; +// STableData* pIMem = NULL; + +// SMemTable* pMemT = pMemRef->snapshot.mem; +// SMemTable* pIMemT = pMemRef->snapshot.imem; + + size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); + for (int32_t i = 0; i < size; ++i) { + STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); + +// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { +// pMem = pMemT->tData[pCheckInfo->tableId]; +// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; +// } +// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { +// pIMem = pIMemT->tData[pCheckInfo->tableId]; +// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; +// } + } + return rows; +} static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) { size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); @@ -2261,12 +2262,13 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) { cur->mixBlock = false; cur->blockCompleted = false; } -#if 0 -int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTableBlockInfo) { + +int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle; pTableBlockInfo->totalSize = 0; pTableBlockInfo->totalRows = 0; + STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb); // find the start data block in file @@ -2284,9 +2286,11 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa int32_t code = TSDB_CODE_SUCCESS; int32_t numOfBlocks = 0; int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); - int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); + int defaultRows = 4096;//TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); STimeWindow win = TSWINDOW_INITIALIZER; + bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + while (true) { numOfBlocks = 0; tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); @@ -2299,8 +2303,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files - if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) || - (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) { + if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) { tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); @@ -2342,19 +2345,26 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa int32_t numOfRows = pBlock[j].numOfRows; pTableBlockInfo->totalRows += numOfRows; - if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows; - if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows; - if (numOfRows < defaultRows) pTableBlockInfo->numOfSmallBlocks+=1; - int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS; - SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex); - blockInfo->numBlocksOfStep++; + if (numOfRows > pTableBlockInfo->maxRows) { + pTableBlockInfo->maxRows = numOfRows; + } + + if (numOfRows < pTableBlockInfo->minRows) { + pTableBlockInfo->minRows = numOfRows; + } + + if (numOfRows < defaultRows) { + pTableBlockInfo->numOfSmallBlocks += 1; + } +// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS; +// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex); +// blockInfo->numBlocksOfStep++; } } } return code; } -#endif static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) { STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 7ffbd4f80aa90e55412ad39a065ce05f56f5d387..4d4ac6c1e4e5cbd5294d3eca8c12a1c7bc6a96ac 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -16,7 +16,16 @@ #include "command.h" #include "tdatablock.h" -// #define SET_VARSTR(pData, val, pOffset) +static int32_t getSchemaBytes(const SSchema* pSchema) { + switch (pSchema->type) { + case TSDB_DATA_TYPE_BINARY: + return (pSchema->bytes - VARSTR_HEADER_SIZE); + case TSDB_DATA_TYPE_NCHAR: + return (pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; + default: + return pSchema->bytes; + } +} static void buildRspData(const STableMeta* pMeta, char* pData) { int32_t* pColSizes = (int32_t*)pData; @@ -50,7 +59,7 @@ static void buildRspData(const STableMeta* pMeta, char* pData) { // Length pData += BitmapLen(numOfRows); for (int32_t i = 0; i < numOfRows; ++i) { - *(int32_t*)pData = pMeta->schema[i].bytes; + *(int32_t*)pData = getSchemaBytes(pMeta->schema + i); pData += sizeof(int32_t); } pColSizes[2] = sizeof(int32_t) * numOfRows; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 485ae0a9c0f3584f240f7e247005a79650c70cf0..5a3e25bd68bd2fd6aa5cb26ba734102d9b204e15 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -128,6 +128,11 @@ typedef struct { int64_t sumRunTimes; } SOperatorProfResult; +typedef struct SLimit { + int64_t limit; + int64_t offset; +} SLimit; + typedef struct STaskCostInfo { int64_t created; int64_t start; @@ -163,6 +168,11 @@ typedef struct SOperatorCostInfo { uint64_t execCost; } SOperatorCostInfo; +typedef struct SOrder { + uint32_t order; + SColumn col; +} SOrder; + // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. typedef struct STaskAttr { @@ -196,7 +206,6 @@ typedef struct STaskAttr { STimeWindow window; SInterval interval; - SSessionWindow sw; int16_t precision; int16_t numOfOutput; int16_t fillType; @@ -206,13 +215,8 @@ typedef struct STaskAttr { int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. int32_t maxTableColumnWidth; int32_t tagLen; // tag value length of current query - SGroupbyExpr* pGroupbyExpr; SExprInfo* pExpr1; - SExprInfo* pExpr2; - int32_t numOfExpr2; - SExprInfo* pExpr3; - int32_t numOfExpr3; SColumnInfo* tableCols; SColumnInfo* tagColList; @@ -220,8 +224,6 @@ typedef struct STaskAttr { int64_t* fillVal; SSingleColumnFilterInfo* pFilterInfo; - // SFilterInfo *pFilters; - void* tsdb; STableGroupInfo tableGroupInfo; // table list SArray int32_t vgId; @@ -384,7 +386,7 @@ typedef struct SExchangeInfo { } SExchangeInfo; typedef struct STableScanInfo { - void* pTsdbReadHandle; + void* dataReader; int32_t numOfBlocks; // extract basic running information. int32_t numOfSkipped; int32_t numOfBlockStatis; @@ -644,12 +646,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3f895b3f1a99e4e93bc168dfc5d61592101d7fac..8e7e3b6495fa117007c6e6d69f817cb6f89aa00e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -249,7 +249,6 @@ static void operatorDummyCloseFn(void* param, int32_t numOfCols) {} static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); -static int32_t getGroupbyColumnIndex(SGroupbyExpr* pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); @@ -287,44 +286,6 @@ static int compareRowData(const void* a, const void* b, const void* userData) { return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; } -static void sortGroupResByOrderList(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, - SSDataBlock* pDataBlock) { - SArray* columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); - size_t size = taosArrayGetSize(columnOrderList); - taosArrayDestroy(columnOrderList); - - if (size <= 0) { - return; - } - - int32_t orderId = pRuntimeEnv->pQueryAttr->order.col.colId; - if (orderId <= 0) { - return; - } - - bool found = false; - int16_t dataOffset = 0; - - for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { - SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, j); - if (orderId == j) { - found = true; - break; - } - - dataOffset += pColInfoData->info.bytes; - } - - if (found == false) { - return; - } - - int16_t type = pRuntimeEnv->pQueryAttr->pExpr1[orderId].base.resSchema.type; - - SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)}; - taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support); -} - // setup the output buffer for each operator SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); @@ -375,17 +336,6 @@ static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput // return (numOfSelectivity > 0 && hasTags); } -static bool isProjQuery(STaskAttr* pQueryAttr) { - for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { - int32_t functId = getExprFunctionId(&pQueryAttr->pExpr1[i]); - if (functId != FUNCTION_PRJ && functId != FUNCTION_TAGPRJ) { - return false; - } - } - - return true; -} - static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -1411,15 +1361,10 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlFunctionCtx* pCtx, int32_t pos, - int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, - STimeWindow* win) { - STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - - bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); - + int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, STimeWindow* win) { + bool ascQuery = true; TSKEY curTs = tsCols[pos]; - TSKEY lastTs = *(TSKEY*)pRuntimeEnv->prevRow[0]; + TSKEY lastTs = 0;//*(TSKEY*)pRuntimeEnv->prevRow[0]; // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed. // start exactly from this point, no need to do interpolation @@ -1434,27 +1379,24 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF return true; } - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); + int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); TSKEY prevTs = ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery)) ? lastTs : tsCols[pos - step]; - doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, - RESULT_ROW_START_INTERP); + doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP); return true; } static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFunctionCtx* pCtx, int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { - STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t numOfOutput = pOperatorInfo->numOfOutput; + int32_t order = TSDB_ORDER_ASC; + int32_t numOfOutput = pOperatorInfo->numOfOutput; TSKEY actualEndKey = tsCols[endRowIndex]; - - TSKEY key = QUERY_IS_ASC_QUERY(pQueryAttr) ? win->ekey : win->skey; + TSKEY key = order ? win->ekey : win->skey; // not ended in current data block, do not invoke interpolation - if ((key > blockEkey && QUERY_IS_ASC_QUERY(pQueryAttr)) || (key < blockEkey && !QUERY_IS_ASC_QUERY(pQueryAttr))) { + if ((key > blockEkey /*&& QUERY_IS_ASC_QUERY(pQueryAttr)*/) || (key < blockEkey /*&& !QUERY_IS_ASC_QUERY(pQueryAttr)*/)) { setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP); return false; } @@ -1465,7 +1407,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun return true; } - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); int32_t nextRowIndex = endRowIndex + step; assert(nextRowIndex >= 0); @@ -1667,10 +1609,9 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; int32_t numOfOutput = pOperatorInfo->numOfOutput; - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); - bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); + int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); + bool ascQuery = true; TSKEY* tsCols = NULL; if (pSDataBlock->pDataBlock != NULL) { @@ -1683,7 +1624,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery); - STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); + STimeWindow win = {0};//getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); SResultRow* pResult = NULL; @@ -1699,7 +1640,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - TSKEY ekey = reviseWindowEkey(pQueryAttr, &win); + TSKEY ekey = 0;//reviseWindowEkey(pQueryAttr, &win); // forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, // binarySearchForKey, true); @@ -1713,31 +1654,31 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, // prevEndPos); if (startPos < 0) { - if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) { - int32_t code = - setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, - tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - startPos = pSDataBlock->info.rows - 1; +// if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) { +// int32_t code = +// setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, +// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset); +// if (code != TSDB_CODE_SUCCESS || pResult == NULL) { +// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); +// } +// +// startPos = pSDataBlock->info.rows - 1; // window start(end) key interpolation // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, // forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos, // forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); - } +// } break; } setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } - if (pQueryAttr->timeWindowInterpo) { - int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0; +// if (pQueryAttr->timeWindowInterpo) { +// int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0; // saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); - } +// } // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); } @@ -2023,28 +1964,6 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCo return TSDB_CODE_SUCCESS; } -static int32_t getGroupbyColumnIndex(SGroupbyExpr* pGroupbyExpr, SSDataBlock* pDataBlock) { - size_t num = taosArrayGetSize(pGroupbyExpr->columnInfo); - for (int32_t k = 0; k < num; ++k) { - SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k); - if (TSDB_COL_IS_TAG(pColIndex->flag)) { - continue; - } - - int32_t colId = pColIndex->colId; - - for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - if (pColInfo->info.colId == colId) { - return i; - } - } - } - - assert(0); - return -1; -} - static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -2264,64 +2183,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { return NULL; } -static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv* pRuntimeEnv, int32_t numOfTables, SArray* pOperator, - void* merger) { - // qDebug("QInfo:0x%"PRIx64" setup runtime env", GET_TASKID(pRuntimeEnv)); - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - - pRuntimeEnv->prevGroupId = INT32_MIN; - pRuntimeEnv->pQueryAttr = pQueryAttr; - - pRuntimeEnv->pResultRowHashTable = - taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pRuntimeEnv->pResultRowListSet = - taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - pRuntimeEnv->keyBuf = taosMemoryMalloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES); - // pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); - pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell)); - - pRuntimeEnv->prevRow = taosMemoryMalloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize); - pRuntimeEnv->tagVal = taosMemoryMalloc(pQueryAttr->tagLen); - - // NOTE: pTableCheckInfo need to update the query time range and the lastKey info - pRuntimeEnv->pTableRetrieveTsMap = - taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - - // pRuntimeEnv->scalarSup = createScalarFuncSupport(pQueryAttr->numOfOutput); - - if (pRuntimeEnv->scalarSup == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL || - pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) { - goto _clean; - } - - if (pQueryAttr->numOfCols) { - char* start = POINTER_BYTES * pQueryAttr->numOfCols + (char*)pRuntimeEnv->prevRow; - pRuntimeEnv->prevRow[0] = start; - for (int32_t i = 1; i < pQueryAttr->numOfCols; ++i) { - pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQueryAttr->tableCols[i - 1].bytes; - } - - if (pQueryAttr->tableCols[0].type == TSDB_DATA_TYPE_TIMESTAMP) { - *(int64_t*)pRuntimeEnv->prevRow[0] = INT64_MIN; - } - } - - // qDebug("QInfo:0x%"PRIx64" init runtime environment completed", GET_TASKID(pRuntimeEnv)); - - // group by normal column, sliding window query, interval query are handled by interval query processor - // interval (down sampling operation) - return TSDB_CODE_SUCCESS; - -_clean: - // destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pRuntimeEnv->pQueryAttr->numOfOutput); - taosMemoryFreeClear(pRuntimeEnv->pResultRowHashTable); - taosMemoryFreeClear(pRuntimeEnv->keyBuf); - taosMemoryFreeClear(pRuntimeEnv->prevRow); - taosMemoryFreeClear(pRuntimeEnv->tagVal); - - return TSDB_CODE_QRY_OUT_OF_MEMORY; -} - static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -2400,17 +2261,6 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q // return false; //} -static bool isFirstLastRowQuery(STaskAttr* pQueryAttr) { - for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { - int32_t functionID = getExprFunctionId(&pQueryAttr->pExpr1[i]); - if (functionID == FUNCTION_LAST_ROW) { - return true; - } - } - - return false; -} - static bool isCachedLastQuery(STaskAttr* pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); @@ -2493,18 +2343,6 @@ static bool onlyOneQueryType(STaskAttr* pQueryAttr, int32_t functId, int32_t fun return true; } -static bool onlyFirstQuery(STaskAttr* pQueryAttr) { - return onlyOneQueryType(pQueryAttr, FUNCTION_FIRST, FUNCTION_FIRST_DST); -} - -static bool onlyLastQuery(STaskAttr* pQueryAttr) { - return onlyOneQueryType(pQueryAttr, FUNCTION_LAST, FUNCTION_LAST_DST); -} - -static bool notContainSessionOrStateWindow(STaskAttr* pQueryAttr) { - return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); -} - static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { bool hasFirstLastFunc = false; bool hasOtherFunc = false; @@ -3006,7 +2844,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, *status = BLK_DATA_ALL_NEEDED; - SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); + SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); if (pCols == NULL) { return terrno; } @@ -3899,76 +3737,6 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S return 0; } -// TODO refactor: this funciton should be merged with setparamForStableStddevColumnData function. -void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, - SExprInfo* pExprInfo) { -#if 0 - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - - int32_t numOfExprs = pQueryAttr->numOfOutput; - for(int32_t i = 0; i < numOfExprs; ++i) { - SExprInfo* pExprInfo1 = &(pExprInfo[i]); - if (pExprInfo1->base.functionId != FUNCTION_STDDEV_DST) { - continue; - } - - SExprBasicInfo* pExpr = &pExprInfo1->base; - - pCtx[i].param[0].arr = NULL; - pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int - - // TODO use hash to speedup this loop - int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); - for (int32_t j = 0; j < numOfGroup; ++j) { - SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j); - if (pQueryAttr->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQueryAttr->tagLen) == 0) { - int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult); - for (int32_t k = 0; k < numOfCols; ++k) { - SStddevInterResult* pres = taosArrayGet(p->pResult, k); - if (pres->info.colId == pExpr->colInfo.colId) { - pCtx[i].param[0].arr = pres->pResult; - break; - } - } - } - } - } -#endif -} - -void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, - SExprInfo* pExpr, char* val, int16_t bytes) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; -#if 0 - int32_t numOfExprs = pQueryAttr->numOfOutput; - for(int32_t i = 0; i < numOfExprs; ++i) { - SExprBasicInfo* pExpr1 = &pExpr[i].base; - if (pExpr1->functionId != FUNCTION_STDDEV_DST) { - continue; - } - - pCtx[i].param[0].arr = NULL; - pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int - - // TODO use hash to speedup this loop - int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); - for (int32_t j = 0; j < numOfGroup; ++j) { - SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j); - if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) { - int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult); - for (int32_t k = 0; k < numOfCols; ++k) { - SStddevInterResult* pres = taosArrayGet(p->pResult, k); - if (pres->info.colId == pExpr1->colInfo.colId) { - pCtx[i].param[0].arr = pres->pResult; - break; - } - } - } - } - } -#endif -} - /* * There are two cases to handle: * @@ -4679,13 +4447,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { *newgroup = false; - while (tsdbNextDataBlock(pTableScanInfo->pTsdbReadHandle)) { + while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { if (isTaskKilled(pOperator->pTaskInfo)) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } pTableScanInfo->numOfBlocks += 1; - tsdbRetrieveDataBlockInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->info); + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); // todo opt // if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { @@ -4722,7 +4490,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // The read handle is not initialized yet, since no qualified tables exists - if (pTableScanInfo->pTsdbReadHandle == NULL) { + if (pTableScanInfo->dataReader == NULL) { return NULL; } @@ -4750,11 +4518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - // if (pTaskInfo->pTsBuf) { - // bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf); - // assert(ret); - // } - // if (pResultRowInfo->size > 0) { pResultRowInfo->curPos = 0; } @@ -4790,44 +4553,44 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) { STableScanInfo* pTableScanInfo = pOperator->info; *newgroup = false; -#if 0 - STableBlockDist tableBlockDist = {0}; - tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables; + + STableBlockDistInfo tableBlockDist = {0}; + tableBlockDist.numOfTables = 1; // TODO set the correct number of tables int32_t numRowSteps = TSDB_DEFAULT_MAX_ROW_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS; if (TSDB_DEFAULT_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) { ++numRowSteps; } + tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo)); taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps); + tableBlockDist.maxRows = INT_MIN; tableBlockDist.minRows = INT_MAX; - tsdbGetFileBlocksDistInfo(pTableScanInfo->pTsdbReadHandle, &tableBlockDist); - tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pTsdbReadHandle); + tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist); + tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); SSDataBlock* pBlock = &pTableScanInfo->block; pBlock->info.rows = 1; pBlock->info.numOfCols = 1; - SBufferWriter bw = tbufInitWriter(NULL, false); - blockDistInfoToBinary(&tableBlockDist, &bw); +// SBufferWriter bw = tbufInitWriter(NULL, false); +// blockDistInfoToBinary(&tableBlockDist, &bw); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); - int32_t len = (int32_t) tbufTell(&bw); - pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); - - *(int32_t*) pColInfo->pData = len; - memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); - - tbufCloseWriter(&bw); +// int32_t len = (int32_t) tbufTell(&bw); +// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); +// *(int32_t*) pColInfo->pData = len; +// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); +// +// tbufCloseWriter(&bw); - SArray* g = GET_TABLEGROUP(pOperator->pRuntimeEnv, 0); - pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0); +// SArray* g = GET_TABLEGROUP(pOperator->, 0); +// pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0); pOperator->status = OP_EXEC_DONE; return pBlock; -#endif } static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { @@ -5472,7 +5235,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, } pInfo->pFilterNode = pCondition; - pInfo->pTsdbReadHandle = pTsdbReadHandle; + pInfo->dataReader = pTsdbReadHandle; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; pInfo->order = order; @@ -5494,7 +5257,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); - pInfo->pTsdbReadHandle = pTsdbReadHandle; + pInfo->dataReader = pTsdbReadHandle; pInfo->times = 1; pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; @@ -5515,32 +5278,42 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim return pOperator; } -SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { + STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } - pInfo->pTsdbReadHandle = pTsdbReadHandle; + pInfo->dataReader = dataReader; pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); - SColumnInfoData infoData = {{0}}; - infoData.info.type = TSDB_DATA_TYPE_BINARY; + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_BINARY; infoData.info.bytes = 1024; infoData.info.colId = 0; taosArrayPush(pInfo->block.pDataBlock, &infoData); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableBlockInfoScanOperator"; + pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->getNextFn = doBlockInfoScan; + pOperator->blockingOptr = false; + pOperator->status = OP_NOT_OPENED; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doBlockInfoScan; + + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; return pOperator; + + _error: + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; } -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, - SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5579,16 +5352,17 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pInfo->readerHandle = streamReadHandle; pInfo->pRes = pResBlock; - pOperator->name = "StreamBlockScanOperator"; + pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doStreamBlockScan; - pOperator->closeFn = operatorDummyCloseFn; - pOperator->pTaskInfo = pTaskInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doStreamBlockScan; + pOperator->closeFn = operatorDummyCloseFn; + pOperator->pTaskInfo = pTaskInfo; + return pOperator; } @@ -5922,80 +5696,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB return pOperator; } -SArray* getOrderCheckColumns(STaskAttr* pQuery) { - int32_t numOfCols = (pQuery->pGroupbyExpr == NULL) ? 0 : taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo); - - SArray* pOrderColumns = NULL; - if (numOfCols > 0) { - pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo); - } else { - pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); - } - - if (pQuery->interval.interval > 0) { - if (pOrderColumns == NULL) { - pOrderColumns = taosArrayInit(1, sizeof(SColIndex)); - } - - SColIndex colIndex = {.colIndex = 0, .colId = 0, .flag = TSDB_COL_NORMAL}; - taosArrayPush(pOrderColumns, &colIndex); - } - - { - numOfCols = (int32_t)taosArrayGetSize(pOrderColumns); - for (int32_t i = 0; i < numOfCols; ++i) { - SColIndex* index = taosArrayGet(pOrderColumns, i); - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SExprBasicInfo* pExpr = &pQuery->pExpr1[j].base; - int32_t functionId = getExprFunctionId(&pQuery->pExpr1[j]); - - if (index->colId == pExpr->pParam[0].pCol->colId && - (functionId == FUNCTION_PRJ || functionId == FUNCTION_TAG || functionId == FUNCTION_TS)) { - index->colIndex = j; - index->colId = pExpr->resSchema.colId; - } - } - } - } - - return pOrderColumns; -} - -SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { - int32_t numOfCols = (pQuery->pGroupbyExpr == NULL) ? 0 : taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo); - - SArray* pOrderColumns = NULL; - if (numOfCols > 0) { - pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo); - } else { - pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SColIndex* index = taosArrayGet(pOrderColumns, i); - - bool found = false; - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SExprBasicInfo* pExpr = &pQuery->pExpr1[j].base; - int32_t functionId = getExprFunctionId(&pQuery->pExpr1[j]); - - // FUNCTION_TAG_DUMMY function needs to be ignored - // if (index->colId == pExpr->pColumns->info.colId && - // ((TSDB_COL_IS_TAG(pExpr->pColumns->flag) && functionId == FUNCTION_TAG) || - // (TSDB_COL_IS_NORMAL_COL(pExpr->pColumns->flag) && functionId == FUNCTION_PRJ))) { - // index->colIndex = j; - // index->colId = pExpr->resSchema.colId; - // found = true; - // break; - // } - } - - assert(found && index->colIndex >= 0 && index->colIndex < pQuery->numOfOutput); - } - - return pOrderColumns; -} - static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey); static void cleanupAggSup(SAggSupporter* pAggSup); @@ -7044,9 +6744,6 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr return pIntervalInfo->binfo.pRes; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t order = pQueryAttr->order.order; - SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -7062,14 +6759,14 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); +// setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex); } pOperator->status = OP_RES_TO_RETURN; - pQueryAttr->order.order = order; // TODO : restore the order +// pQueryAttr->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); @@ -8153,7 +7850,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) { int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]); if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; assert(pQueryAttr->numOfOutput == 1); SExprInfo* pExprInfo = &pOperator->pExpr[0]; @@ -8936,8 +8632,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod return pList; } -int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, - uint64_t queryId, uint64_t taskId) { +int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) { int32_t code = 0; if (tableType == TSDB_SUPER_TABLE) { code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId); @@ -9041,43 +8736,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol return TSDB_CODE_SUCCESS; } -static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) { - assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL); - - for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) { - SExprBasicInfo* pSqlExprMsg = &pQueryAttr->pExpr1[k].base; - // if (pSqlExprMsg->functionId == FUNCTION_ARITHM) { - // continue; - // } - - // todo opt performance - SColIndex* pColIndex = NULL; /*&pSqlExprMsg->colInfo;*/ - if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) { - int32_t f = 0; - for (f = 0; f < pQueryAttr->numOfCols; ++f) { - if (pColIndex->colId == pQueryAttr->tableCols[f].colId) { - pColIndex->colIndex = f; - break; - } - } - - assert(f < pQueryAttr->numOfCols); - } else if (pColIndex->colId <= TSDB_UD_COLUMN_INDEX) { - // do nothing for user-defined constant value result columns - } else { - int32_t f = 0; - for (f = 0; f < pQueryAttr->numOfTags; ++f) { - if (pColIndex->colId == pQueryAttr->tagColList[f].colId) { - pColIndex->colIndex = f; - break; - } - } - - assert(f < pQueryAttr->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX); - } - } -} - void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) { const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); @@ -9087,16 +8745,16 @@ void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) { const float THRESHOLD_RATIO = 0.85f; - if (isProjQuery(pQueryAttr)) { - int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize; - if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) { - numOfRes = MIN_ROWS_FOR_PRJ_QUERY; - } - - pResultInfo->capacity = numOfRes; - } else { // in case of non-prj query, a smaller output buffer will be used. - pResultInfo->capacity = DEFAULT_MIN_ROWS; - } +// if (isProjQuery(pQueryAttr)) { +// int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize; +// if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) { +// numOfRes = MIN_ROWS_FOR_PRJ_QUERY; +// } +// +// pResultInfo->capacity = numOfRes; +// } else { // in case of non-prj query, a smaller output buffer will be used. +// pResultInfo->capacity = DEFAULT_MIN_ROWS; +// } pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO); pResultInfo->totalRows = 0; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index f7fccb29f7981bf66bf88584d404fdb3ef0a7ca0..ab7bfc7767e99cd924340c05d4b84fcfb4003b01 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -38,9 +38,14 @@ void minFunction(SqlFunctionCtx* pCtx); void maxFunction(SqlFunctionCtx *pCtx); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); void stddevFunction(SqlFunctionCtx* pCtx); void stddevFinalize(SqlFunctionCtx* pCtx); +bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +void percentileFunction(SqlFunctionCtx *pCtx); + bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); void firstFunction(SqlFunctionCtx *pCtx); void lastFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 348c7a4368492afbac17b8cda9efdaa97353d560..909eac9532e24ed77736941633d1bd7b945d34f6 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -68,9 +68,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .checkFunc = stubCheckAndGetResultType, .getEnvFunc = getStddevFuncEnv, - .initFunc = maxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .initFunc = stddevFunctionSetup, + .processFunc = stddevFunction, + .finalizeFunc = stddevFinalize }, { .name = "percentile", @@ -434,6 +434,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { break; } + case FUNCTION_TYPE_STDDEV: case FUNCTION_TYPE_SIN: case FUNCTION_TYPE_COS: case FUNCTION_TYPE_TAN: diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 610e5a0bb231a3f1693f4d18869fa53972ecfcdd..0dc2989f77bb1671592e1f68e6b5a8623d043289 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -14,6 +14,7 @@ */ #include "builtinsimpl.h" +#include "tpercentile.h" #include "querynodes.h" #include "taggfunction.h" #include "tdatablock.h" @@ -453,6 +454,7 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { } typedef struct SStddevRes { + double result; int64_t count; union {double quadraticDSum; int64_t quadraticISum;}; union {double dsum; int64_t isum;}; @@ -463,38 +465,129 @@ bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } +bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + SStddevRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); + memset(pRes, 0, sizeof(SStddevRes)); + return true; +} + void stddevFunction(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; // Only the pre-computing information loaded and actual data does not loaded SInputColumnInfoData* pInput = &pCtx->input; - SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0]; - int32_t type = pInput->pData[0]->info.type; + SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; + int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// } else { // computing based on the true data block - SColumnInfoData* pCol = pInput->pData[0]; + // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; - int32_t start = pInput->startRowIndex; - int32_t numOfRows = pInput->numOfRows; + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; - switch(type) { - case TSDB_DATA_TYPE_INT: { - int32_t* plist = (int32_t*)pCol->pData; + switch (type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t* plist = (int8_t*)pCol->pData; for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } + numOfElem += 1; pStddevRes->count += 1; - pStddevRes->isum += plist[i]; + pStddevRes->isum += plist[i]; pStddevRes->quadraticISum += plist[i] * plist[i]; } + + break; + } + + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* plist = (int16_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pStddevRes->count += 1; + pStddevRes->isum += plist[i]; + pStddevRes->quadraticISum += plist[i] * plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_INT: { + int32_t* plist = (int32_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pStddevRes->count += 1; + pStddevRes->isum += plist[i]; + pStddevRes->quadraticISum += plist[i] * plist[i]; + } + + break; + } + + case TSDB_DATA_TYPE_BIGINT: { + int64_t* plist = (int64_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pStddevRes->count += 1; + pStddevRes->isum += plist[i]; + pStddevRes->quadraticISum += plist[i] * plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_FLOAT: { + float* plist = (float*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pStddevRes->count += 1; + pStddevRes->isum += plist[i]; + pStddevRes->quadraticISum += plist[i] * plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_DOUBLE: { + double* plist = (double*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pStddevRes->count += 1; + pStddevRes->isum += plist[i]; + pStddevRes->quadraticISum += plist[i] * plist[i]; } break; } + default: + break; + } + // data in the check operation are all null, not output SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); } @@ -503,11 +596,122 @@ void stddevFinalize(SqlFunctionCtx* pCtx) { functionFinalize(pCtx); SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - double res = pStddevRes->quadraticISum/pStddevRes->count - (pStddevRes->isum / pStddevRes->count) * (pStddevRes->isum / pStddevRes->count); + double avg = pStddevRes->isum / ((double) pStddevRes->count); + pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg); +} + +typedef struct SPercentileInfo { + tMemBucket *pMemBucket; + int32_t stage; + double minval; + double maxval; + int64_t numOfElems; +} SPercentileInfo; + +bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SPercentileInfo); + return true; +} + +bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + // in the first round, get the min-max value of all involved data + SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo); + SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX); + SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX); + pInfo->numOfElems = 0; + + return true; } +void percentileFunction(SqlFunctionCtx *pCtx) { + int32_t notNullElems = 0; +#if 0 + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + pInfo->stage += 1; + + // all data are null, set it completed + if (pInfo->numOfElems == 0) { + pResInfo->complete = true; + return; + } else { + pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); + } + } + + // the first stage, only acquire the min/max value + if (pInfo->stage == 0) { + if (pCtx->preAggVals.isSet) { + double tmin = 0.0, tmax = 0.0; + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { + tmin = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.min); + tmax = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.max); + } else if (IS_FLOAT_TYPE(pCtx->inputType)) { + tmin = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.min); + tmax = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.max); + } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { + tmin = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.min); + tmax = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.max); + } else { + assert(true); + } + + if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) { + SET_DOUBLE_VAL(&pInfo->minval, tmin); + } + + if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) { + SET_DOUBLE_VAL(&pInfo->maxval, tmax); + } + + pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull); + } else { + for (int32_t i = 0; i < pCtx->size; ++i) { + char *data = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType)) { + continue; + } + + double v = 0; + GET_TYPED_DATA(v, double, pCtx->inputType, data); + + if (v < GET_DOUBLE_VAL(&pInfo->minval)) { + SET_DOUBLE_VAL(&pInfo->minval, v); + } + + if (v > GET_DOUBLE_VAL(&pInfo->maxval)) { + SET_DOUBLE_VAL(&pInfo->maxval, v); + } + + pInfo->numOfElems += 1; + } + } + return; + } + // the second stage, calculate the true percentile value + for (int32_t i = 0; i < pCtx->size; ++i) { + char *data = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType)) { + continue; + } + + notNullElems += 1; + tMemBucketPut(pInfo->pMemBucket, data, 1); + } + + SET_VAL(pCtx, notNullElems, 1); + pResInfo->hasResult = DATA_SET_FLAG; +#endif + +} bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index af24906c9e2a3f6b61917842eff2dce268255125..9174420ff4d0f2eece7a5b4a4fb8c6f87a0681dc 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -176,26 +176,6 @@ typedef struct SResPair { double avg; } SResPair; -#define TSDB_BLOCK_DIST_STEP_ROWS 16 - -typedef struct STableBlockDist { - uint16_t rowSize; - uint16_t numOfFiles; - uint32_t numOfTables; - uint64_t totalSize; - uint64_t totalRows; - int32_t maxRows; - int32_t minRows; - int32_t firstSeekTimeUs; - uint32_t numOfRowsInMemTable; - uint32_t numOfSmallBlocks; - SArray *dataBlockInfos; -} STableBlockDist; - -typedef struct SFileBlockInfo { - int32_t numBlocksOfStep; -} SFileBlockInfo; - void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) { pCell->initialized = false; } @@ -3984,7 +3964,7 @@ static void irate_function(SqlFunctionCtx *pCtx) { } } -static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) { +static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDistInfo* pDist) { SBufferReader br = tbufInitReader(data, len, false); pDist->numOfTables = tbufReadUint32(&br); @@ -4024,7 +4004,7 @@ static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDi static void blockInfo_func(SqlFunctionCtx* pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); + STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo); int32_t len = *(int32_t*) pCtx->pInput; blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist); @@ -4036,8 +4016,8 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) { //pResInfo->hasResult = DATA_SET_FLAG; } -static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlockDist* pSrc) { - STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); +static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlockDistInfo* pSrc) { + STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo); assert(pDist != NULL && pSrc != NULL); pDist->numOfTables += pSrc->numOfTables; @@ -4071,7 +4051,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock } void block_func_merge(SqlFunctionCtx* pCtx) { - STableBlockDist info = {0}; + STableBlockDistInfo info = {0}; int32_t len = *(int32_t*) pCtx->pInput; blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); @@ -4082,7 +4062,7 @@ void block_func_merge(SqlFunctionCtx* pCtx) { //pResInfo->hasResult = DATA_SET_FLAG; } -void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents, +void getPercentiles(STableBlockDistInfo *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents, double* percents, int32_t* percentiles) { if (totalBlocks == 0) { for (int32_t i = 0; i < numOfPercents; ++i) { @@ -4117,7 +4097,7 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32 } } -void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { +void generateBlockDistResult(STableBlockDistInfo *pTableBlockDist, char* result) { if (pTableBlockDist == NULL) { return; } @@ -4178,7 +4158,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); + STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo); pDist->rowSize = (uint16_t)pCtx->param[0].i; generateBlockDistResult(pDist, pCtx->pOutput); diff --git a/source/libs/index/src/indexFst.c b/source/libs/index/src/indexFst.c index 24bc7a93a222a772a576397ac6e04f7661247de4..a146564c9efcb27e60b50dce679ae812810b52bb 100644 --- a/source/libs/index/src/indexFst.c +++ b/source/libs/index/src/indexFst.c @@ -1251,7 +1251,6 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { taosArrayPush(sws->stack, &s); out += trn.out; node = fstGetNode(sws->fst, trn.addr); - fstNodeDestroy(node); } else { // This is a little tricky. We're in this case if the // given bound is not a prefix of any key in the FST. @@ -1349,7 +1348,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); } - FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); + FstSlice slice = fstSliceCreate(buf, isz); if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { taosArrayDestroyEx(sws->stack, streamStateDestroy); sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState)); diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index eff53108cd6436125f9e9eb83f0d54df7226e2d0..c45f655746a2a9568bf928a3fcedabb1f005e837 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -510,6 +510,68 @@ void checkFstCheckIteratorRange2() { } delete m; } +void checkFstCheckIteratorRange3() { + FstWriter* fw = new FstWriter; + int64_t s = taosGetTimestampUs(); + int count = 2; + // Performance_fstWriteRecords(fw); + int64_t e = taosGetTimestampUs(); + + std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; + + fw->Put("ab", 1); + fw->Put("b", 2); + fw->Put("cdd", 3); + fw->Put("cde", 3); + fw->Put("ddd", 4); + fw->Put("ed", 5); + delete fw; + + FstReadMemory* m = new FstReadMemory(1024 * 64); + if (m->init() == false) { + std::cout << "init readMemory failed" << std::endl; + delete m; + return; + } + { + // range search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "b", GE, "", (RangeType)10, result); + assert(result.size() == 5); + automCtxDestroy(ctx); + } + { + // range search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "", (RangeType)20, "ab", LE, result); + assert(result.size() == 1); + automCtxDestroy(ctx); + // taosMemoryFree(ctx); + } + { + // range search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "", (RangeType)30, "ab", LT, result); + assert(result.size() == 0); + automCtxDestroy(ctx); + } + { + // range search + std::vector result; + AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS); + // [b, e) + m->SearchRange(ctx, "ed", GT, "ed", (RangeType)40, result); + assert(result.size() == 0); + automCtxDestroy(ctx); + } + delete m; +} void fst_get(Fst* fst) { for (int i = 0; i < 10000; i++) { @@ -573,11 +635,12 @@ int main(int argc, char* argv[]) { // path suid colName ver // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); //} - checkFstCheckIterator1(); - checkFstCheckIterator2(); - checkFstCheckIteratorPrefix(); - checkFstCheckIteratorRange1(); - checkFstCheckIteratorRange2(); + // checkFstCheckIterator1(); + // checkFstCheckIterator2(); + // checkFstCheckIteratorPrefix(); + // checkFstCheckIteratorRange1(); + // checkFstCheckIteratorRange2(); + checkFstCheckIteratorRange3(); // checkFstLongTerm(); // checkFstPrefixSearch(); diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index 5894fc88ccdcac049f8608ee964e178f5b974a0b..ee17de50e06ba54dafb731e389937ad3e96ecef9 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -77,7 +77,7 @@ typedef struct STableDataBlocks { STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache char *pData; bool cloned; - + int32_t createTbReqLen; SParsedDataColInfo boundColumnInfo; SRowBuilder rowBuilder; } STableDataBlocks; @@ -118,6 +118,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? dataBuf->pTableMeta->uid : dataBuf->pTableMeta->suid); pBlocks->uid = dataBuf->pTableMeta->uid; pBlocks->sversion = dataBuf->pTableMeta->sversion; + pBlocks->schemaLen = dataBuf->createTbReqLen; if (pBlocks->numOfRows + numOfRows >= INT16_MAX) { return TSDB_CODE_TSC_INVALID_OPERATION; @@ -136,7 +137,7 @@ void destroyBlockHashmap(SHashObj* pDataBlockHash); int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, - const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); -int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks); + const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq); +int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks); #endif // TDENGINE_DATABLOCKMGT_H diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index d7eb31b6cdd9e3c1363d560c3d9b8aa16d571a49..d9cf0b39b54f9183b1f72b7bd6f12db18bf664a9 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -753,7 +753,7 @@ static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, S } // pSql -> tag1_value, ...) -static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, uint8_t precision, const SName* pName) { +static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const SName* pName) { if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -763,9 +763,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" for (int i = 0; i < pCxt->tags.numOfBound; ++i) { NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); - SSchema* pSchema = &pTagsSchema[pCxt->tags.boundColumns[i]]; - param.schema = pSchema; - CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg)); + SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1 + param.schema = pTagSchema; + CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg)); } SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder); @@ -791,6 +791,7 @@ static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, S if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pBackup->uid = tGenIdPI64(); return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES); } @@ -833,7 +834,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) if (TK_NK_LP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); } - CHECK_CODE(parseTagsClause(pCxt, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision, &name)); + CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, &name)); + NEXT_TOKEN(pCxt->pSql, sToken); + if (TK_NK_RP != sToken.type) { + return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z); + } return TSDB_CODE_SUCCESS; } @@ -1015,7 +1020,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { STableDataBlocks *dataBuf = NULL; CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL)); + sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq)); if (TK_NK_LP == sToken.type) { // pSql -> field1_name, ...) @@ -1046,7 +1051,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } // merge according to vgId if (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { - CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->schemaAttache, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); + CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); } return buildOutput(pCxt); } diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index f70e514b5a05d0cf2cdc6f282307348e36babaac..088b25d5448665de9d84ae94c023ee8fef41e711 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -149,8 +149,28 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star return TSDB_CODE_SUCCESS; } +static int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) { + int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq); + if (pBlocks->nAllocSize - pBlocks->size < len) { + pBlocks->nAllocSize += len + pBlocks->rowSize; + char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize); + if (pTmp != NULL) { + pBlocks->pData = pTmp; + memset(pBlocks->pData + pBlocks->size, 0, pBlocks->nAllocSize - pBlocks->size); + } else { + pBlocks->nAllocSize -= len + pBlocks->rowSize; + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + char* pBuf = pBlocks->pData + pBlocks->size; + tSerializeSVCreateTbReq((void**)&pBuf, pCreateTbReq); + pBlocks->size += len; + pBlocks->createTbReqLen = len; + return TSDB_CODE_SUCCESS; +} + int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, - const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList) { + const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq) { *dataBlocks = NULL; STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); if (t1 != NULL) { @@ -163,6 +183,13 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3 return ret; } + if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctbCfg.pTag) { + ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + } + taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); if (pBlockList) { taosArrayPush(pBlockList, dataBlocks); @@ -294,7 +321,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey int32_t extendedRowSize = getExtendedRowSize(dataBuf); SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; - char * pBlockData = pBlocks->data; + char * pBlockData = pBlocks->data + pBlocks->schemaLen; int n = 0; while (n < nRows) { pBlkKeyTuple->skey = TD_ROW_KEY((STSRow *)pBlockData); @@ -340,44 +367,26 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey } // Erase the empty space reserved for binary data -static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, int8_t schemaAttached, bool isRawPayload) { +static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, bool isRawPayload) { // TODO: optimize this function, handle the case while binary is not presented STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableComInfo tinfo = getTableInfo(pTableMeta); SSchema* pSchema = getTableColumnSchema(pTableMeta); + int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen; SSubmitBlk* pBlock = pDataBlock; - memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk)); - pDataBlock = (char*)pDataBlock + sizeof(SSubmitBlk); + memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen); + pDataBlock = (char*)pDataBlock + nonDataLen; int32_t flen = 0; // original total length of row - - // schema needs to be included into the submit data block - if (schemaAttached) { - int32_t numOfCols = getNumOfColumns(pTableDataBlock->pTableMeta); - for(int32_t j = 0; j < numOfCols; ++j) { - STColumn* pCol = (STColumn*) pDataBlock; - pCol->colId = htons(pSchema[j].colId); - pCol->type = pSchema[j].type; - pCol->bytes = htons(pSchema[j].bytes); - pCol->offset = 0; - - pDataBlock = (char*)pDataBlock + sizeof(STColumn); + if (isRawPayload) { + for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { flen += TYPE_BYTES[pSchema[j].type]; } - - int32_t schemaSize = sizeof(STColumn) * numOfCols; - pBlock->schemaLen = schemaSize; - } else { - if (isRawPayload) { - for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { - flen += TYPE_BYTES[pSchema[j].type]; - } - } - pBlock->schemaLen = 0; } + pBlock->schemaLen = pTableDataBlock->createTbReqLen; - char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); + char* p = pTableDataBlock->pData + nonDataLen; pBlock->dataLen = 0; int32_t numOfRows = pBlock->numOfRows; @@ -414,7 +423,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB return pBlock->dataLen + pBlock->schemaLen; } -int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) { +int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) { const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); @@ -429,7 +438,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t if (pBlocks->numOfRows > 0) { STableDataBlocks* dataBuf = NULL; int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, - INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); + INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL); if (ret != TSDB_CODE_SUCCESS) { taosHashCleanup(pVnodeDataBlockHashList); destroyBlockArrayList(pVnodeDataBlockList); @@ -474,7 +483,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); // erase the empty space reserved for binary data - int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload); + int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload); assert(finalLen <= len); dataBuf->size += (finalLen + sizeof(SSubmitBlk)); diff --git a/source/libs/parser/test/parserInsertTest.cpp b/source/libs/parser/test/parserInsertTest.cpp index 90e2ba3db2d18e8552078a5647c75525131b8211..d292fcf8b0b7987cf6f98e29e4a08cc15452db50 100644 --- a/source/libs/parser/test/parserInsertTest.cpp +++ b/source/libs/parser/test/parserInsertTest.cpp @@ -62,7 +62,7 @@ protected: void dumpReslut() { SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); size_t num = taosArrayGetSize(pStmt->pDataBlocks); - cout << "schemaAttache:" << (int32_t)pStmt->schemaAttache << ", payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType << ", numOfVgs:" << num << endl; + cout << "payloadType:" << (int32_t)pStmt->payloadType << ", insertType:" << pStmt->insertType << ", numOfVgs:" << num << endl; for (size_t i = 0; i < num; ++i) { SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(pStmt->pDataBlocks, i); cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl; @@ -81,7 +81,6 @@ protected: void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) { SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); - ASSERT_EQ(pStmt->schemaAttache, 0); ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV); ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT); size_t num = taosArrayGetSize(pStmt->pDataBlocks); @@ -168,6 +167,18 @@ TEST_F(InsertTest, multiTableMultiRowTest) { checkReslut(2, 3, 2); } +// INSERT INTO +// tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...) +// tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...) +TEST_F(InsertTest, autoCreateTableTest) { + setDatabase("root", "test"); + + bind("insert into st1s1 using st1 tags(1, 'wxy') values (now, 1, \"beijing\")(now+1s, 2, \"shanghai\")(now+2s, 3, \"guangzhou\")"); + ASSERT_EQ(run(), TSDB_CODE_SUCCESS); + dumpReslut(); + checkReslut(1, 3); +} + TEST_F(InsertTest, toleranceTest) { setDatabase("root", "test"); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index d8fbfce55f699a7e834e982d47f1e7ed84271f04..b756976394b6d8d99200df0a5345adaede07b797 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -5,6 +5,7 @@ ./test.sh -f tsim/user/basic1.sim # ---- db +./test.sh -f tsim/db/create_all_options.sim ./test.sh -f tsim/db/alter_option.sim ./test.sh -f tsim/db/basic1.sim ./test.sh -f tsim/db/basic2.sim diff --git a/tests/script/tsim/db/create_all_options.sim b/tests/script/tsim/db/create_all_options.sim new file mode 100644 index 0000000000000000000000000000000000000000..7f39474f4d93bb6c320811da61a6f6f853c3f4b0 --- /dev/null +++ b/tests/script/tsim/db/create_all_options.sim @@ -0,0 +1,486 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 + +$loop_cnt = 0 +check_dnode_ready_1: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> rows: $rows +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +if $data00 != 1 then + return -1 +endi +if $data01 != localhost:7100 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready_1 +endi +if $data14 != ready then + goto check_dnode_ready_1 +endi +if $data24 != ready then + goto check_dnode_ready_1 +endi + +print ============= create database with all options +#database_option: { +# | BLOCKS value [3~1000, default: 6] +# | CACHE value [default: 16] +# | CACHELAST value [0, 1, 2, 3, default: 0] +# | COMP [0 | 1 | 2, default: 2] +# | DAYS value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day] +# | FSYNC value [0 ~ 180000 ms, default: 3000] +# | MAXROWS value [200~10000, default: 4096] +# | MINROWS value [10~1000, default: 100] +# | KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day] +# | PRECISION ['ms' | 'us' | 'ns', default: ms] +# | QUORUM value [1 | 2, default: 1] +# | REPLICA value [1 | 3, default: 1] +# | TTL value [1d ~ , default: 1] +# | WAL value [1 | 2, default: 1] +# | VGROUPS value [default: 2] +# | SINGLE_STABLE [0 | 1, default: ] +# | STREAM_MODE [0 | 1, default: ] +# +#$data0_db : name +#$data1_db : create_time +#$data2_db : vgroups +#$data3_db : ntables +#$data4_db : replica +#$data5_db : quorum +#$data6_db : days +#$data7_db : keep +#$data8_db : cache +#$data9_db : blocks +#$data10_db : minrows +#$data11_db : maxrows +#$data12_db : wal +#$data13_db : fsync +#$data14_db : comp +#$data15_db : cachelast +#$data16_db : precision + +print ====> create database db, with default +sql create database db +sql show databases +print rows: $rows +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $rows != 2 then + return -1 +endi +if $data0_db != db then # name + return -1 +endi +if $data2_db != 2 then # vgroups + return -1 +endi +if $data3_db != 0 then # ntables + return -1 +endi +if $data4_db != 1 then # replica + return -1 +endi +if $data5_db != 1 then # quorum + return -1 +endi +if $data6_db != 14400 then # days + return -1 +endi +if $data7_db != 5256000,5256000,5256000 then # keep + return -1 +endi +if $data8_db != 16 then # cache + return -1 +endi +if $data9_db != 6 then # blocks + return -1 +endi +if $data10_db != 100 then # minrows + return -1 +endi +if $data11_db != 4096 then # maxrows + return -1 +endi +if $data12_db != 1 then # wal + return -1 +endi +if $data13_db != 3000 then # fsync + return -1 +endi +if $data14_db != 2 then # comp + return -1 +endi +if $data15_db != 0 then # cachelast + return -1 +endi +if $data16_db != ms then # precision + return -1 +endi +sql drop database db + +print ====> BLOCKS value [3~1000, default: 6] +sql create database db BLOCKS 3 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data9_db != 3 then + return -1 +endi +sql drop database db + +sql create database db BLOCKS 1000 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data9_db != 1000 then + return -1 +endi +sql drop database db +sql_error create database db BLOCKS 2 +sql_error create database db BLOCKS 0 +sql_error create database db BLOCKS -1 + +print ====> CACHE value [default: 16] +sql create database db CACHE 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data8_db != 1 then + return -1 +endi +sql drop database db + +sql create database db CACHE 128 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data8_db != 128 then + return -1 +endi +sql drop database db + +print ====> CACHELAST value [0, 1, 2, 3, default: 0] +sql create database db CACHELAST 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data15_db != 1 then + return -1 +endi +sql drop database db + +sql create database db CACHELAST 2 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data15_db != 2 then + return -1 +endi +sql drop database db + +sql create database db CACHELAST 3 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data15_db != 3 then + return -1 +endi +sql drop database db +sql_error create database db CACHELAST 4 +sql_error create database db CACHELAST -1 + +print ====> COMP [0 | 1 | 2, default: 2] +sql create database db COMP 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data14_db != 1 then + return -1 +endi +sql drop database db + +sql create database db COMP 0 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data14_db != 0 then + return -1 +endi +sql drop database db +sql_error create database db COMP 3 +sql_error create database db COMP -1 + +#print ====> DAYS value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day] +#print ====> KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day] +#sql create database db DAYS 60m KEEP 60m +#sql show databases +#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $data6_db != 60 then +# return -1 +#endi +#if $data7_db != 60,60,60 then +# return -1 +#endi +#sql drop database db +#sql create database db DAYS 60m KEEP 1d +#sql show databases +#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $data6_db != 60 then +# return -1 +#endi +#if $data7_db != 1440,1440,1440 then +# return -1 +#endi +#sql create database db DAYS 3650d KEEP 365000d +#sql show databases +#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $data6_db != 5256000 then +# return -1 +#endi +#if $data7_db != 525600000,525600000,525600000 then +# return -1 +#endi +#sql drop database db +#sql_error create database db DAYS -59m +#sql_error create database db DAYS 59m +#sql_error create database db DAYS 5256001m +#sql_error create database db DAYS 3651d +#sql_error create database db KEEP -59m +#sql_error create database db KEEP 14399m +#sql_error create database db KEEP 525600001m +#sql_error create database db KEEP 365001d + +print ====> FSYNC value [0 ~ 180000 ms, default: 3000] +sql create database db FSYNC 0 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data13_db != 0 then + return -1 +endi +sql drop database db + +sql create database db FSYNC 180000 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data13_db != 180000 then + return -1 +endi +sql drop database db +sql_error create database db FSYNC 180001 +sql_error create database db FSYNC -1 + +print ====> MAXROWS value [200~10000, default: 4096], MINROWS value [10~1000, default: 100] +sql create database db MAXROWS 10000 MINROWS 1000 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data10_db != 1000 then + return -1 +endi +if $data11_db != 10000 then + return -1 +endi +sql drop database db + +sql create database db MAXROWS 200 MINROWS 10 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data10_db != 10 then + return -1 +endi +if $data11_db != 200 then + return -1 +endi +sql drop database db +sql_error create database db MAXROWS -1 +sql_error create database db MAXROWS 0 +sql_error create database db MAXROWS 199 +sql_error create database db MAXROWS 10001 +sql_error create database db MINROWS -1 +sql_error create database db MINROWS 0 +sql_error create database db MINROWS 9 +sql_error create database db MINROWS 1001 +sql_error create database db MAXROWS 500 MINROWS 1000 + +print ====> PRECISION ['ms' | 'us' | 'ns', default: ms] +sql create database db PRECISION 'us' +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data16_db != us then + return -1 +endi +sql drop database db + +sql create database db PRECISION 'ns' +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data16_db != ns then + return -1 +endi +sql drop database db +sql_error create database db PRECISION 'as' +sql_error create database db PRECISION -1 + +print ====> QUORUM value [1 | 2, default: 1] +#sql create database db QUORUM 2 +#sql show databases +#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $data5_db != 2 then +# return -1 +#endi +#sql drop database db + +sql create database db QUORUM 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data5_db != 1 then + return -1 +endi +sql drop database db +sql_error create database db QUORUM 3 +sql_error create database db QUORUM 0 +sql_error create database db QUORUM -1 + +print ====> REPLICA value [1 | 3, default: 1] +sql create database db REPLICA 3 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data4_db != 3 then + return -1 +endi +sql drop database db + +sql create database db REPLICA 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data4_db != 1 then + return -1 +endi +sql drop database db +sql_error create database db REPLICA 2 +sql_error create database db REPLICA 0 +sql_error create database db REPLICA -1 +sql_error create database db REPLICA 4 + +print ====> TTL value [1d ~ , default: 1] +sql create database db TTL 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $dataXX_db != 1 then +# return -1 +#endi +sql drop database db + +sql create database db TTL 10 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $dataXX_db != 10 then +# return -1 +#endi +sql drop database db +sql_error create database db TTL 0 +sql_error create database db TTL -1 + +print ====> WAL value [1 | 2, default: 1] +sql create database db WAL 2 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data12_db != 2 then + return -1 +endi +sql drop database db + +sql create database db WAL 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data12_db != 1 then + return -1 +endi +sql drop database db +sql_error create database db WAL 3 +sql_error create database db WAL -1 +sql_error create database db WAL 0 + +print ====> VGROUPS value [1~4096, default: 2] +sql create database db VGROUPS 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data2_db != 1 then + return -1 +endi +sql drop database db + +sql create database db VGROUPS 16 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +if $data2_db != 16 then + return -1 +endi +sql drop database db +sql_error create database db VGROUPS 4097 +sql_error create database db VGROUPS -1 +sql_error create database db VGROUPS 0 + +print ====> SINGLE_STABLE [0 | 1, default: ] +sql create database db SINGLE_STABLE 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $dataXXXX_db != 1 then +# return -1 +#endi +sql drop database db + +sql create database db SINGLE_STABLE 0 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $dataXXXX_db != 0 then +# return -1 +#endi +sql drop database db +sql_error create database db SINGLE_STABLE 2 +sql_error create database db SINGLE_STABLE -1 + +print ====> STREAM_MODE [0 | 1, default: ] +sql create database db STREAM_MODE 1 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $dataXXX_db != 1 then +# return -1 +#endi +sql drop database db + +sql create database db STREAM_MODE 0 +sql show databases +print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db +#if $dataXXX_db != 0 then +# return -1 +#endi +sql drop database db +sql_error create database db STREAM_MODE 2 +sql_error create database db STREAM_MODE -1 + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT