未验证 提交 f0b50add 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #14184 from taosdata/feature/TD-13041

feat:sort table group if needed & optimize partition by tag
...@@ -50,8 +50,10 @@ typedef enum EStreamType { ...@@ -50,8 +50,10 @@ typedef enum EStreamType {
} EStreamType; } EStreamType;
typedef struct { typedef struct {
SArray* pGroupList;
SArray* pTableList; SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid SHashObj* map; // speedup acquire the tableQueryInfo by table uid
bool needSortTableByGroupId;
void* pTagCond; void* pTagCond;
void* pTagIndexCond; void* pTagIndexCond;
uint64_t suid; uint64_t suid;
......
...@@ -36,7 +36,7 @@ typedef struct SReadHandle { ...@@ -36,7 +36,7 @@ typedef struct SReadHandle {
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
int8_t initTsdbReader; // int8_t initTsdbReader;
} SReadHandle; } SReadHandle;
enum { enum {
......
...@@ -963,6 +963,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { ...@@ -963,6 +963,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
} else if (strcasecmp("transPullupInterval", name) == 0) { } else if (strcasecmp("transPullupInterval", name) == 0) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
} else if (strcasecmp("ttlUnit", name) == 0) {
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
} else if (strcasecmp("ttlPushInterval", name) == 0) {
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32;
} else if (strcasecmp("tmrDebugFlag", name) == 0) { } else if (strcasecmp("tmrDebugFlag", name) == 0) {
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32; tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
} else if (strcasecmp("tsdbDebugFlag", name) == 0) { } else if (strcasecmp("tsdbDebugFlag", name) == 0) {
......
...@@ -100,7 +100,7 @@ static void *mndThreadFp(void *param) { ...@@ -100,7 +100,7 @@ static void *mndThreadFp(void *param) {
taosMsleep(100); taosMsleep(100);
if (mndGetStop(pMnode)) break; if (mndGetStop(pMnode)) break;
if (lastTime % (tsTransPullupInterval * 10) == 1) { if (lastTime % (tsTtlPushInterval * 10) == 1) {
mndTtlTimer(pMnode); mndTtlTimer(pMnode);
} }
......
...@@ -116,7 +116,8 @@ typedef void *tsdbReaderT; ...@@ -116,7 +116,8 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList);
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
void *pMemRef); void *pMemRef);
...@@ -195,7 +196,6 @@ struct SVnodeCfg { ...@@ -195,7 +196,6 @@ struct SVnodeCfg {
typedef struct { typedef struct {
TSKEY lastKey; TSKEY lastKey;
uint64_t uid; uint64_t uid;
uint64_t groupId;
} STableKeyInfo; } STableKeyInfo;
struct SMetaEntry { struct SMetaEntry {
......
...@@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub ...@@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp); SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
......
...@@ -381,6 +381,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { ...@@ -381,6 +381,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i); tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i);
metaDropTableByUid(pMeta, *uid, NULL); metaDropTableByUid(pMeta, *uid, NULL);
metaDebug("ttl drop table:%"PRId64, *uid);
} }
metaULock(pMeta); metaULock(pMeta);
return 0; return 0;
...@@ -443,7 +444,6 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { ...@@ -443,7 +444,6 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
// drop schema.db (todo) // drop schema.db (todo)
} }
metaError("ttl drop table:%s", e.name);
tDecoderClear(&dc); tDecoderClear(&dc);
tdbFree(pData); tdbFree(pData);
...@@ -976,7 +976,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ...@@ -976,7 +976,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
SDecoder dc = {0}; SDecoder dc = {0};
// get super table // get super table
tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData); if(tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0){
return -1;
}
tbDbKey.uid = pCtbEntry->ctbEntry.suid; tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = *(int64_t *)pData; tbDbKey.version = *(int64_t *)pData;
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData); tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
......
...@@ -403,7 +403,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -403,7 +403,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.reader = pHandle->execHandle.pExecReader[i], .reader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initTsdbReader = 1, // .initTsdbReader = 1,
}; };
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
...@@ -479,7 +479,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -479,7 +479,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
.reader = pStreamReader, .reader = pStreamReader,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initTsdbReader = 1, // .initTsdbReader = 1,
}; };
/*pTask->exec.inputHandle = pStreamReader;*/ /*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
...@@ -223,9 +223,8 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { ...@@ -223,9 +223,8 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
return rows; return rows;
} }
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) { static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, SArray* pTableList) {
size_t tableSize = taosArrayGetSize(pTableList->pTableList); size_t tableSize = taosArrayGetSize(pTableList);
assert(tableSize >= 1);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo)); SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
...@@ -235,7 +234,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S ...@@ -235,7 +234,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
// todo apply the lastkey of table check to avoid to load header file // todo apply the lastkey of table check to avoid to load header file
for (int32_t j = 0; j < tableSize; ++j) { for (int32_t j = 0; j < tableSize; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j); STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList, j);
STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid}; STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
info.suid = pTsdbReadHandle->suid; info.suid = pTsdbReadHandle->suid;
...@@ -254,8 +253,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S ...@@ -254,8 +253,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
pTsdbReadHandle->idStr); pTsdbReadHandle->idStr);
} }
// TODO group table according to the tag value.
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
return pTableCheckInfo; return pTableCheckInfo;
} }
...@@ -497,8 +494,21 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle ...@@ -497,8 +494,21 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){
STsdbReadHandle* pTsdbReadHandle = reader;
if(pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo);
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
return TSDB_CODE_TDB_OUT_OF_MEMORY;
}
return TDB_CODE_SUCCESS;
}
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
uint64_t taskId) { uint64_t taskId) {
if(taosArrayGetSize(tableList) == 0){
return NULL;
}
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return NULL; return NULL;
...@@ -543,7 +553,7 @@ tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLis ...@@ -543,7 +553,7 @@ tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLis
} }
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle, tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList),
pTsdbReadHandle->idStr); pTsdbReadHandle->idStr);
return (tsdbReaderT)pTsdbReadHandle; return (tsdbReaderT)pTsdbReadHandle;
...@@ -639,7 +649,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL ...@@ -639,7 +649,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
return NULL; return NULL;
} }
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId); STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList->pTableList, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return NULL; return NULL;
} }
...@@ -2842,7 +2852,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { ...@@ -2842,7 +2852,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
break; break;
} }
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id, .groupId = 0}; STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }
...@@ -3644,17 +3654,6 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) { ...@@ -3644,17 +3654,6 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
} }
} }
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
return -1;
} else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
return 1;
} else {
ASSERT(false);
return 0;
}
}
static void* doFreeColumnInfoData(SArray* pColumnInfoData) { static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
if (pColumnInfoData == NULL) { if (pColumnInfoData == NULL) {
return NULL; return NULL;
......
...@@ -273,6 +273,10 @@ typedef struct STableScanInfo { ...@@ -273,6 +273,10 @@ typedef struct STableScanInfo {
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t curTWinIdx; int32_t curTWinIdx;
int32_t currentGroupId;
uint64_t queryId;
uint64_t taskId;
} STableScanInfo; } STableScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
...@@ -706,7 +710,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -706,7 +710,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
...@@ -749,8 +753,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -749,8 +753,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
...@@ -845,7 +849,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -845,7 +849,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -287,6 +287,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){ ...@@ -287,6 +287,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
uint64_t tableUid = pScanNode->uid; uint64_t tableUid = pScanNode->uid;
...@@ -314,7 +315,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -314,7 +315,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
for (int i = 0; i < taosArrayGetSize(res); i++) { for (int i = 0; i < taosArrayGetSize(res); i++) {
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0}; STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
taosArrayPush(pListInfo->pTableList, &info); taosArrayPush(pListInfo->pTableList, &info);
} }
taosArrayDestroy(res); taosArrayDestroy(res);
...@@ -335,9 +336,14 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -335,9 +336,14 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
} }
}else { // Create one table group. }else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0}; STableKeyInfo info = {.lastKey = 0, .uid = tableUid};
taosArrayPush(pListInfo->pTableList, &info); taosArrayPush(pListInfo->pTableList, &info);
} }
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
if(pListInfo->pGroupList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
//put into list as default group, remove it if grouping sorting is required later
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
return code; return code;
} }
......
...@@ -3862,9 +3862,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -3862,9 +3862,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
return pTaskInfo; return pTaskInfo;
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
...@@ -3895,8 +3892,67 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI ...@@ -3895,8 +3892,67 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) { static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum){
if (groupKey == NULL) { taosArrayClear(pTableListInfo->pGroupList);
SArray *sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
if(sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
if (index == -1){
void *p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
SArray *tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
if(tGroup == NULL) {
taosArrayDestroy(sortSupport);
return TSDB_CODE_OUT_OF_MEMORY;
}
if(taosArrayPush(tGroup, info) == NULL){
qError("taos push info array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
if(p == NULL){
if(taosArrayPush(sortSupport, groupId) != NULL){
qError("taos push support array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
if(taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL){
qError("taos push group array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
}else{
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
if(taosArrayInsert(sortSupport, pos, groupId) == NULL){
qError("taos insert support array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
if(taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL){
qError("taos insert group array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
}
}else{
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
if(taosArrayPush(tGroup, info) == NULL){
qError("taos push uid array error");
taosArrayDestroy(sortSupport);
return TSDB_CODE_QRY_APP_ERROR;
}
}
}
taosArrayDestroy(sortSupport);
return TDB_CODE_SUCCESS;
}
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
if (group == NULL) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -3906,13 +3962,14 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3906,13 +3962,14 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
int32_t keyLen = 0; int32_t keyLen = 0;
void* keyBuf = NULL; void* keyBuf = NULL;
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
for (int32_t j = 0; j < numOfGroupCols; ++j) { SNode* node;
SColumn* pCol = taosArrayGet(groupKey, j); FOREACH(node, group) {
keyLen += pCol->bytes; // actual data + null_flag SExprNode *pExpr = (SExprNode *)node;
keyLen += pExpr->resType.bytes;
} }
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
keyLen += nullFlagSize; keyLen += nullFlagSize;
keyBuf = taosMemoryCalloc(1, keyLen); keyBuf = taosMemoryCalloc(1, keyLen);
...@@ -3920,59 +3977,68 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3920,59 +3977,68 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t groupNum = 0;
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid); metaGetTableEntryByUid(&mr, info->uid);
SNodeList *groupNew = nodesCloneList(group);
nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
char* isNull = (char*)keyBuf; char* isNull = (char*)keyBuf;
char* pStart = (char*)keyBuf + sizeof(int8_t) * numOfGroupCols; char* pStart = (char*)keyBuf + nullFlagSize;
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j); SNode* pNode;
int32_t index = 0;
if (strcmp(pCol->name, "tbname") == 0) { FOREACH(pNode, groupNew){
isNull[i] = 0; SNode* pNew = NULL;
memcpy(pStart, mr.me.name, strlen(mr.me.name)); int32_t code = scalarCalculateConstants(pNode, &pNew);
pStart += strlen(mr.me.name); if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(pNew);
} else { } else {
STagVal tagVal = {0}; taosMemoryFree(keyBuf);
tagVal.cid = pCol->colId; nodesClearList(groupNew);
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal); return code;
if (p == NULL) {
isNull[j] = 1;
continue;
}
isNull[i] = 0;
if (pCol->type == TSDB_DATA_TYPE_JSON) {
// int32_t dataLen = getJsonValueLen(pkey->pData);
// memcpy(pStart, (pkey->pData), dataLen);
// pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
memcpy(pStart, tagVal.pData, tagVal.nData);
pStart += tagVal.nData;
ASSERT(tagVal.nData <= pCol->bytes);
} else {
memcpy(pStart, &(tagVal.i64), pCol->bytes);
pStart += pCol->bytes;
}
} }
}
int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t* pGroupId = taosHashGet(pTableListInfo->map, keyBuf, len); ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
SValueNode *pValue = (SValueNode *)pNew;
if (!pGroupId) { if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) {
uint64_t tmpId = calcGroupId(keyBuf, len); isNull[index++] = 1;
info->groupId = tmpId; continue;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t)); } else {
} else { isNull[index++] = 0;
info->groupId = *pGroupId; char* data = nodesGetValueFromNode(pValue);
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
int32_t len = getJsonValueLen(data);
memcpy(pStart, data, len);
pStart += len;
} else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
memcpy(pStart, data, varDataTLen(data));
pStart += varDataTLen(data);
} else {
memcpy(pStart, data, pValue->node.resType.bytes);
pStart += pValue->node.resType.bytes;
}
}
} }
int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
groupNum++;
nodesClearList(groupNew);
metaReaderClear(&mr); metaReaderClear(&mr);
} }
taosMemoryFree(keyBuf); taosMemoryFree(keyBuf);
if(pTableListInfo->needSortTableByGroupId){
return sortTableGroup(pTableListInfo, groupNum);
}
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -3984,39 +4050,36 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3984,39 +4050,36 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId); int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
if (pDataReader == NULL && terrno != 0) { if(code){
pTaskInfo->code = terrno; pTaskInfo->code = code;
return NULL;
}
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
tsdbCleanupReadHandle(pDataReader);
pTaskInfo->code = terrno;
return NULL; return NULL;
} }
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) { if (code) {
tsdbCleanupReadHandle(pDataReader);
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); if(code){
SOperatorInfo* pOperator = return NULL;
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId); }
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
return NULL;
}
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator; return pOperator;
...@@ -4025,46 +4088,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4025,46 +4088,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = { STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark, .waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType, .calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
}; };
tsdbReaderT pDataReader = NULL;
if (pHandle) { if (pHandle) {
if (pHandle->initTsdbReader) { createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
// for stream
ASSERT(pHandle->vnode);
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
} else {
// for tq
ASSERT(pHandle->meta);
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
}
}
#if 0
if (pDataReader == NULL && terrno != 0) {
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
// return NULL;
} else {
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
} }
#endif
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) {
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
...@@ -4093,7 +4127,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4093,7 +4127,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
} else { // Create one table group. } else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0}; STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid};
taosArrayPush(pTableListInfo->pTableList, &info); taosArrayPush(pTableListInfo->pTableList, &info);
} }
...@@ -4118,7 +4152,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4118,7 +4152,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
cond.suid = pBlockNode->suid; cond.suid = pBlockNode->suid;
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
} }
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, queryId, taskId);
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
...@@ -4355,7 +4389,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* ...@@ -4355,7 +4389,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error; goto _error;
} }
tsdbReaderT pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); tsdbReaderT pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, queryId, taskId);
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
return pReader; return pReader;
...@@ -4584,6 +4618,13 @@ _complete: ...@@ -4584,6 +4618,13 @@ _complete:
static void doDestroyTableList(STableListInfo* pTableqinfoList) { static void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList); taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map); taosHashCleanup(pTableqinfoList->map);
if(pTableqinfoList->needSortTableByGroupId){
for(int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++){
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
taosArrayDestroy(tmp);
}
}
taosArrayDestroy(pTableqinfoList->pGroupList);
pTableqinfoList->pTableList = NULL; pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL; pTableqinfoList->map = NULL;
......
...@@ -418,7 +418,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -418,7 +418,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -500,6 +500,48 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -500,6 +500,48 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
} }
return NULL;
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if(pInfo->currentGroupId == -1){
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbCleanupReadHandle(pInfo->dataReader);
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
pInfo->dataReader = pReader;
}
SSDataBlock* result = doTableScanGroup(pOperator);
if(result){
return result;
}
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbSetTableList(pInfo->dataReader, tableList);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->curTWinIdx = 0;
pInfo->scanTimes = 0;
result = doTableScanGroup(pOperator);
if(result){
return result;
}
setTaskStatus(pTaskInfo, TASK_COMPLETED); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
} }
...@@ -525,8 +567,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -525,8 +567,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -561,10 +603,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -561,10 +603,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReader = pDataReader;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList; pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0; pInfo->curTWinIdx = 0;
pInfo->queryId = queryId;
pInfo->taskId = taskId;
pInfo->currentGroupId = -1;
pOperator->name = "TableScanOperator"; // for debug purpose pOperator->name = "TableScanOperator"; // for debug purpose
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
...@@ -778,8 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -778,8 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0; pTableScanInfo->curTWinIdx = 0;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
return true; return true;
} }
...@@ -1087,9 +1132,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { ...@@ -1087,9 +1132,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
return tableIdList; return tableIdList;
} }
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup) { STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -1129,7 +1174,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan ...@@ -1129,7 +1174,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
} }
if (pHandle) { if (pHandle) {
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
if (pSTInfo->interval.interval > 0) { if (pSTInfo->interval.interval > 0) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
...@@ -1889,11 +1934,12 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -1889,11 +1934,12 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error; goto _error;
} }
pInfo->pTableList = pTableListInfo; pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList; pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pOperator->name = "TagScanOperator"; pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
...@@ -1919,10 +1965,7 @@ _error: ...@@ -1919,10 +1965,7 @@ _error:
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo; STableListInfo* tableListInfo;
int32_t tableStartIndex; int32_t currentGroupId;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT* SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle; SReadHandle readHandle;
...@@ -1968,12 +2011,6 @@ typedef struct STableMergeScanInfo { ...@@ -1968,12 +2011,6 @@ typedef struct STableMergeScanInfo {
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
} STableMergeScanInfo; } STableMergeScanInfo;
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
const STableKeyInfo* info1 = p1;
const STableKeyInfo* info2 = p2;
return info1->groupId - info2->groupId;
}
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
...@@ -1985,55 +2022,9 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle ...@@ -1985,55 +2022,9 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
if (groupKeys) {
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
}
taosArrayDestroy(groupKeys);
return TSDB_CODE_SUCCESS;
}
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; return code;
}
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subListInfo->pTableList);
taosMemoryFree(subListInfo);
}
cleanupQueryTableDataCond(&cond);
return TSDB_CODE_SUCCESS;
_error:
return code;
}
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subListInfo, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subListInfo->pTableList);
taosMemoryFree(subListInfo);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2225,32 +2216,34 @@ SArray* generateSortByTsInfo(int32_t order) { ...@@ -2225,32 +2216,34 @@ SArray* generateSortByTsInfo(int32_t order) {
return pList; return pList;
} }
static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) {
SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tmp, taosArrayGet(tableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, tmp, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(tmp);
}
return TSDB_CODE_SUCCESS;
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{ SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId);
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < tableListSize; ++i) {
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
}
pInfo->tableEndIndex = i - 1;
}
int32_t tableStartIdx = pInfo->tableStartIndex; createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList,
int32_t tableEndIdx = pInfo->tableEndIndex;
STableListInfo* tableListInfo = pInfo->tableListInfo;
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
pInfo->dataReaders, pInfo->queryId, pInfo->taskId); pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
// todo the total available buffer should be determined by total capacity of buffer of this task. // todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result // the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); int32_t tableLen = taosArrayGetSize(tableList);
pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen==0?1:tableLen) + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
...@@ -2337,38 +2330,43 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2337,38 +2330,43 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
if (tableListSize == 0) { if (pInfo->currentGroupId == -1) {
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
while (pInfo->tableStartIndex < tableListSize) { if (pBlock != NULL) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
if (pBlock != NULL) { if(groupId) pBlock->info.groupId = *groupId;
pBlock->info.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock; return pBlock;
} else {
stopGroupTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) {
doSetOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId =
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
} }
stopGroupTableMergeScan(pOperator);
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
startGroupTableMergeScan(pOperator);
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) {
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
if(groupId) pBlock->info.groupId = *groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
}
doSetOperatorCompleted(pOperator);
return pBlock; return pBlock;
} }
...@@ -2445,6 +2443,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2445,6 +2443,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
pInfo->queryId = queryId; pInfo->queryId = queryId;
pInfo->taskId = taskId; pInfo->taskId = taskId;
pInfo->currentGroupId = -1;
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
......
...@@ -1220,6 +1220,7 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) { ...@@ -1220,6 +1220,7 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_JSON:
pNode->datum.p = (char*)value; pNode->datum.p = (char*)value;
break; break;
default: default:
......
...@@ -1061,6 +1061,7 @@ static EDealRes partTagsOptRebuildTbanmeImpl(SNode** pNode, void* pContext) { ...@@ -1061,6 +1061,7 @@ static EDealRes partTagsOptRebuildTbanmeImpl(SNode** pNode, void* pContext) {
} }
strcpy(pFunc->functionName, "tbname"); strcpy(pFunc->functionName, "tbname");
pFunc->funcType = FUNCTION_TYPE_TBNAME; pFunc->funcType = FUNCTION_TYPE_TBNAME;
pFunc->node.resType = ((SColumnNode*)*pNode)->node.resType;
nodesDestroyNode(*pNode); nodesDestroyNode(*pNode);
*pNode = (SNode*)pFunc; *pNode = (SNode*)pFunc;
return DEAL_RES_IGNORE_CHILD; return DEAL_RES_IGNORE_CHILD;
...@@ -1188,7 +1189,7 @@ static const SOptimizeRule optimizeRuleSet[] = { ...@@ -1188,7 +1189,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaOptimize},
// {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize} {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}
}; };
// clang-format on // clang-format on
......
...@@ -412,52 +412,59 @@ class TDTestCase: ...@@ -412,52 +412,59 @@ class TDTestCase:
tdSql.checkColNameList(res, cname_list) tdSql.checkColNameList(res, cname_list)
# #
# test group by & order by json tag # test group by & order by json tag
tdSql.query("select ts,jtag->'tag1' from jsons1 partition by jtag->'tag1' order by jtag->'tag1' desc")
tdSql.checkRows(11)
tdSql.checkData(0, 1, '"femail"')
tdSql.checkData(2, 1, '"收到货"')
tdSql.checkData(7, 1, "false")
# tdSql.error("select count(*) from jsons1 group by jtag") # tdSql.error("select count(*) from jsons1 group by jtag")
# tdSql.error("select count(*) from jsons1 partition by jtag") # tdSql.error("select count(*) from jsons1 partition by jtag")
# tdSql.error("select count(*) from jsons1 group by jtag order by jtag") # tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag->'tag2'") tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag->'tag2'")
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag") tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag")
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc") # tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc")
tdSql.checkRows(8) # tdSql.checkRows(8)
tdSql.checkData(0, 0, 2) # tdSql.checkData(0, 0, 2)
tdSql.checkData(0, 1, '"femail"') # tdSql.checkData(0, 1, '"femail"')
tdSql.checkData(1, 0, 2) # tdSql.checkData(1, 0, 2)
tdSql.checkData(1, 1, '"收到货"') # tdSql.checkData(1, 1, '"收到货"')
tdSql.checkData(2, 0, 1) # tdSql.checkData(2, 0, 1)
tdSql.checkData(2, 1, "11.000000000") # tdSql.checkData(2, 1, "11.000000000")
tdSql.checkData(5, 0, 1) # tdSql.checkData(5, 0, 1)
tdSql.checkData(5, 1, "false") # tdSql.checkData(5, 1, "false")
tdSql.checkData(6, 0, 1) # tdSql.checkData(6, 0, 1)
tdSql.checkData(6, 1, "null") # tdSql.checkData(6, 1, "null")
tdSql.checkData(7, 0, 2) # tdSql.checkData(7, 0, 2)
tdSql.checkData(7, 1, None) # tdSql.checkData(7, 1, None)
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc") # tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc")
tdSql.checkRows(8) # tdSql.checkRows(8)
tdSql.checkData(0, 0, 2) # tdSql.checkData(0, 0, 2)
tdSql.checkData(0, 1, None) # tdSql.checkData(0, 1, None)
tdSql.checkData(2, 0, 1) # tdSql.checkData(2, 0, 1)
tdSql.checkData(2, 1, "false") # tdSql.checkData(2, 1, "false")
tdSql.checkData(5, 0, 1) # tdSql.checkData(5, 0, 1)
tdSql.checkData(5, 1, "11.000000000") # tdSql.checkData(5, 1, "11.000000000")
tdSql.checkData(7, 0, 2) # tdSql.checkData(7, 0, 2)
tdSql.checkData(7, 1, '"femail"') # tdSql.checkData(7, 1, '"femail"')
# #
# test stddev with group by json tag # test stddev with group by json tag
tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'") # tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
tdSql.checkRows(8) # tdSql.checkRows(8)
tdSql.checkData(0, 0, 10) # tdSql.checkData(0, 0, 10)
tdSql.checkData(0, 1, None) # tdSql.checkData(0, 1, None)
tdSql.checkData(4, 0, 0) # tdSql.checkData(4, 0, 0)
tdSql.checkData(4, 1, "5.000000000") # tdSql.checkData(4, 1, "5.000000000")
tdSql.checkData(7, 0, 11) # tdSql.checkData(7, 0, 11)
tdSql.checkData(7, 1, '"femail"') # tdSql.checkData(7, 1, '"femail"')
#
res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'") # res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'")
cname_list = [] # cname_list = []
cname_list.append("stddev(dataint)") # cname_list.append("stddev(dataint)")
cname_list.append("jsons1.jtag->'tag1'") # cname_list.append("jsons1.jtag->'tag1'")
tdSql.checkColNameList(res, cname_list) # tdSql.checkColNameList(res, cname_list)
# test top/bottom with group by json tag # test top/bottom with group by json tag
# tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'") # tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
...@@ -470,8 +477,8 @@ class TDTestCase: ...@@ -470,8 +477,8 @@ class TDTestCase:
# tdSql.checkData(10, 1, '"femail"') # tdSql.checkData(10, 1, '"femail"')
# test having # test having
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1") # tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
tdSql.checkRows(3) # tdSql.checkRows(3)
# subquery with json tag # subquery with json tag
tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint") tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册