提交 b6f64b3f 编写于 作者: H hjxilinx

[td-98] add qsort and move to algo file

上级 130d4cd2
...@@ -19,9 +19,14 @@ ...@@ -19,9 +19,14 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tarray.h"
#include "tutil.h" #include "tutil.h"
#include "dataformat.h"
#include "talgo.h"
#define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo #define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo
#define MIN_BUFFER_SIZE (1 << 19) #define MIN_BUFFER_SIZE (1 << 19)
...@@ -55,12 +60,12 @@ typedef struct tFlushoutData { ...@@ -55,12 +60,12 @@ typedef struct tFlushoutData {
tFlushoutInfo *pFlushoutInfo; tFlushoutInfo *pFlushoutInfo;
} tFlushoutData; } tFlushoutData;
typedef struct SFileInfo { typedef struct SExtFileInfo {
uint32_t nFileSize; // in pages uint32_t nFileSize; // in pages
uint32_t pageSize; uint32_t pageSize;
uint32_t numOfElemsInFile; uint32_t numOfElemsInFile;
tFlushoutData flushoutData; tFlushoutData flushoutData;
} SFileInfo; } SExtFileInfo;
typedef struct tFilePage { typedef struct tFilePage {
uint64_t numOfElems; uint64_t numOfElems;
...@@ -109,26 +114,17 @@ typedef struct tExtMemBuffer { ...@@ -109,26 +114,17 @@ typedef struct tExtMemBuffer {
char * path; char * path;
FILE * file; FILE * file;
SFileInfo fileMeta; SExtFileInfo fileMeta;
SColumnModel * pColumnModel; SColumnModel * pColumnModel;
EXT_BUFFER_FLUSH_MODEL flushModel; EXT_BUFFER_FLUSH_MODEL flushModel;
} tExtMemBuffer; } tExtMemBuffer;
typedef struct tTagSchema { //typedef struct tTagSchema {
struct SSchema *pSchema; // struct SSchema *pSchema;
int32_t numOfCols; // int32_t numOfCols;
int32_t colOffset[]; // int32_t colOffset[];
} tTagSchema; //} tTagSchema;
typedef struct tSidSet {
int32_t numOfSids;
int32_t numOfSubSet;
STableIdInfo **pTableIdList;
int32_t * starterPos; // position of each subgroup, generated according to
SColumnModel *pColumnModel;
SColumnOrderInfo orderIdx;
} tSidSet;
/** /**
* *
......
...@@ -39,7 +39,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -39,7 +39,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
typedef struct SSqlGroupbyExpr { typedef struct SSqlGroupbyExpr {
int16_t tableIndex; int16_t tableIndex;
int16_t numOfGroupCols; int16_t numOfGroupCols;
SColIndex columnInfo[TSDB_MAX_TAGS]; // group by columns information SColIndex* columnInfo; // group by columns information
int16_t orderIndex; // order by column index int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr; } SSqlGroupbyExpr;
...@@ -171,7 +171,7 @@ typedef struct SQInfo { ...@@ -171,7 +171,7 @@ typedef struct SQInfo {
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
sem_t dataReady; sem_t dataReady;
SArray* pTableIdList; // table id list SArray* pTableList; // table id list
void* tsdb; void* tsdb;
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
......
...@@ -869,7 +869,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -869,7 +869,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) { if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) {
SQueryCond cond = {0}; SQueryCond cond = {0};
/*int32_t ret = */setQueryCond(pQueryInfo, &cond); /*int32_t ret = */ setQueryCond(pQueryInfo, &cond);
tQueryOnSkipList(pSkipList, &cond, pQueryInfo->q.nType, result); tQueryOnSkipList(pSkipList, &cond, pQueryInfo->q.nType, result);
} else { } else {
/* Brutal force scan the whole skip list to find the appropriate result, /* Brutal force scan the whole skip list to find the appropriate result,
......
...@@ -43,7 +43,7 @@ tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnMo ...@@ -43,7 +43,7 @@ tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnMo
pMemBuffer->path = strdup(name); pMemBuffer->path = strdup(name);
pTrace("create tmp file:%s", pMemBuffer->path); pTrace("create tmp file:%s", pMemBuffer->path);
SFileInfo *pFMeta = &pMemBuffer->fileMeta; SExtFileInfo *pFMeta = &pMemBuffer->fileMeta;
pFMeta->pageSize = DEFAULT_PAGE_SIZE; pFMeta->pageSize = DEFAULT_PAGE_SIZE;
...@@ -63,7 +63,7 @@ void* destoryExtMemBuffer(tExtMemBuffer *pMemBuffer) { ...@@ -63,7 +63,7 @@ void* destoryExtMemBuffer(tExtMemBuffer *pMemBuffer) {
} }
// release flush out info link // release flush out info link
SFileInfo *pFileMeta = &pMemBuffer->fileMeta; SExtFileInfo *pFileMeta = &pMemBuffer->fileMeta;
if (pFileMeta->flushoutData.nAllocSize != 0 && pFileMeta->flushoutData.pFlushoutInfo != NULL) { if (pFileMeta->flushoutData.nAllocSize != 0 && pFileMeta->flushoutData.pFlushoutInfo != NULL) {
tfree(pFileMeta->flushoutData.pFlushoutInfo); tfree(pFileMeta->flushoutData.pFlushoutInfo);
} }
...@@ -97,7 +97,7 @@ void* destoryExtMemBuffer(tExtMemBuffer *pMemBuffer) { ...@@ -97,7 +97,7 @@ void* destoryExtMemBuffer(tExtMemBuffer *pMemBuffer) {
/* /*
* alloc more memory for flush out info entries. * alloc more memory for flush out info entries.
*/ */
static bool allocFlushoutInfoEntries(SFileInfo *pFileMeta) { static bool allocFlushoutInfoEntries(SExtFileInfo *pFileMeta) {
pFileMeta->flushoutData.nAllocSize = pFileMeta->flushoutData.nAllocSize << 1; pFileMeta->flushoutData.nAllocSize = pFileMeta->flushoutData.nAllocSize << 1;
tFlushoutInfo *tmp = (tFlushoutInfo *)realloc(pFileMeta->flushoutData.pFlushoutInfo, tFlushoutInfo *tmp = (tFlushoutInfo *)realloc(pFileMeta->flushoutData.pFlushoutInfo,
...@@ -208,7 +208,7 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow ...@@ -208,7 +208,7 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow
} }
static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) { static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) {
SFileInfo *pFileMeta = &pMemBuffer->fileMeta; SExtFileInfo *pFileMeta = &pMemBuffer->fileMeta;
if (pMemBuffer->flushModel == MULTIPLE_APPEND_MODEL) { if (pMemBuffer->flushModel == MULTIPLE_APPEND_MODEL) {
if (pFileMeta->flushoutData.nLength == pFileMeta->flushoutData.nAllocSize && !allocFlushoutInfoEntries(pFileMeta)) { if (pFileMeta->flushoutData.nLength == pFileMeta->flushoutData.nAllocSize && !allocFlushoutInfoEntries(pFileMeta)) {
...@@ -238,7 +238,7 @@ static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) { ...@@ -238,7 +238,7 @@ static bool tExtMemBufferUpdateFlushoutInfo(tExtMemBuffer *pMemBuffer) {
} }
static void tExtMemBufferClearFlushoutInfo(tExtMemBuffer *pMemBuffer) { static void tExtMemBufferClearFlushoutInfo(tExtMemBuffer *pMemBuffer) {
SFileInfo *pFileMeta = &pMemBuffer->fileMeta; SExtFileInfo *pFileMeta = &pMemBuffer->fileMeta;
pFileMeta->flushoutData.nLength = 0; pFileMeta->flushoutData.nLength = 0;
memset(pFileMeta->flushoutData.pFlushoutInfo, 0, sizeof(tFlushoutInfo) * pFileMeta->flushoutData.nAllocSize); memset(pFileMeta->flushoutData.pFlushoutInfo, 0, sizeof(tFlushoutInfo) * pFileMeta->flushoutData.nAllocSize);
......
...@@ -92,9 +92,7 @@ enum { ...@@ -92,9 +92,7 @@ enum {
TS_JOIN_TAG_NOT_EQUALS = 2, TS_JOIN_TAG_NOT_EQUALS = 2,
}; };
static int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end);
int32_t end);
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
...@@ -2187,7 +2185,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { ...@@ -2187,7 +2185,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
num = 128; num = 128;
} else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table
size_t s = taosArrayGetSize(pQInfo->pTableIdList); size_t s = taosArrayGetSize(pQInfo->pTableList);
num = MAX(s, INITIAL_RESULT_ROWS_VALUE); num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset } else { // for super table query, one page for each subset
num = 1;//pQInfo->pSidSet->numOfSubSet; num = 1;//pQInfo->pSidSet->numOfSubSet;
...@@ -2255,7 +2253,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void ...@@ -2255,7 +2253,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
// get one queried meter // get one queried meter
assert(0); assert(0);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[0]->sid); // SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[0]->sid);
pRuntimeEnv->pTSBuf = param; pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1; pRuntimeEnv->cur.vnodeIndex = -1;
...@@ -2272,7 +2270,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void ...@@ -2272,7 +2270,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
// return ret; // return ret;
// } // }
// tSidSetSort(pQInfo->pSidSet); // createTableGroup(pQInfo->pSidSet);
int32_t size = getInitialPageNum(pQInfo); int32_t size = getInitialPageNum(pQInfo);
int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize);
...@@ -2303,7 +2301,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void ...@@ -2303,7 +2301,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
SArray *sa = taosArrayInit(1, POINTER_BYTES); SArray *sa = taosArrayInit(1, POINTER_BYTES);
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid); // SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid);
// taosArrayPush(sa, &p1); // taosArrayPush(sa, &p1);
// } // }
...@@ -2312,7 +2310,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void ...@@ -2312,7 +2310,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
taosArrayPush(cols, &pQuery->colList[i]); taosArrayPush(cols, &pQuery->colList[i]);
} }
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(NULL, &cond, sa, cols); pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, sa, cols);
// metric query do not invoke interpolation, it will be done at the second-stage merge // metric query do not invoke interpolation, it will be done at the second-stage merge
if (!isPointInterpoQuery(pQuery)) { if (!isPointInterpoQuery(pQuery)) {
...@@ -2333,18 +2331,18 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void ...@@ -2333,18 +2331,18 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void
*/ */
void vnodeDecMeterRefcnt(SQInfo *pQInfo) { void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) { if (pQInfo != NULL) {
// assert(taosHashGetSize(pQInfo->pTableIdList) >= 1); // assert(taosHashGetSize(pQInfo->pTableList) >= 1);
} }
#if 0 #if 0
if (pQInfo == NULL || pQInfo->numOfMeters == 1) { if (pQInfo == NULL || pQInfo->numOfTables == 1) {
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1); atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode, dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries); pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
} else { } else {
int32_t num = 0; int32_t num = 0;
for (int32_t i = 0; i < pQInfo->numOfMeters; ++i) { for (int32_t i = 0; i < pQInfo->numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->pTableIdList, pQInfo->pSidSet->pTableIdList[i]->sid); SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) { if (pMeter->numOfQueries > 0) {
...@@ -2358,9 +2356,9 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { ...@@ -2358,9 +2356,9 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
* in order to reduce log output, for all meters of which numOfQueries count are 0, * in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information * we do not output corresponding information
*/ */
num = pQInfo->numOfMeters - num; num = pQInfo->numOfTables - num;
dTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo, dTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo,
pQInfo->numOfMeters, num); pQInfo->numOfTables, num);
} }
#endif #endif
} }
...@@ -2683,9 +2681,9 @@ static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, vo ...@@ -2683,9 +2681,9 @@ static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, vo
#endif #endif
} }
void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, void *pMeterSidInfo) { void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, void *pMeterSidInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SColumnModel *pTagSchema = pSidSet->pColumnModel; SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel;
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
...@@ -2909,37 +2907,35 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) ...@@ -2909,37 +2907,35 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
return leftTimestamp > rightTimestamp ? 1 : -1; return leftTimestamp > rightTimestamp ? 1 : -1;
} }
int32_t mergeResultsToGroup(SQInfo *pQInfo) { int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
// SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
// int64_t st = taosGetTimestampMs(); // int64_t st = taosGetTimestampMs();
// int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { // while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
// int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
// int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; int32_t end = taosArrayGetSize(pQInfo->pTableList) - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
//
// assert(0); ret = mergeIntoGroupResultImpl(pQInfo, pQInfo->pTableDataInfo, start, end);
// // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end); if (ret < 0) { // not enough disk space to save the data into disk
// if (ret < 0) { // not enough disk space to save the data into disk return -1;
// return -1; }
// }
// pQInfo->subgroupIdx += 1;
// pQInfo->subgroupIdx += 1;
// // this group generates at least one result, return results
// // this group generates at least one result, return results // if (ret > 0) {
// if (ret > 0) { // break;
// break; // }
// }
// assert(pQInfo->numOfGroupResultPages == 0);
// assert(pQInfo->numOfGroupResultPages == 0); dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); // }
// }
// // dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", // GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
// GET_QINFO_ADDR(pQuery),
// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2949,7 +2945,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2949,7 +2945,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
pQInfo->numOfGroupResultPages = 0; pQInfo->numOfGroupResultPages = 0;
// current results of group has been sent to client, try next group // current results of group has been sent to client, try next group
if (mergeResultsToGroup(pQInfo) != TSDB_CODE_SUCCESS) { if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
return; // failed to save data in the disk return; // failed to save data in the disk
} }
...@@ -3019,27 +3015,27 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW ...@@ -3019,27 +3015,27 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
return maxOutput; return maxOutput;
} }
UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end) { int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
tFilePage ** buffer = (tFilePage **)pQuery->sdata; tFilePage ** buffer = (tFilePage **)pQuery->sdata;
int32_t * posList = calloc((end - start), sizeof(int32_t)); int32_t * posList = calloc((end - start), sizeof(int32_t));
STableDataInfo **pTableList = malloc(POINTER_BYTES * (end - start)); STableDataInfo **pTableList = malloc(POINTER_BYTES * (end - start));
// todo opt for the case of one table per group // todo opt for the case of one table per group
int32_t numOfMeters = 0; int32_t numOfTables = 0;
for (int32_t i = start; i < end; ++i) { for (int32_t i = start; i < end; ++i) {
int32_t tid = pTableDataInfo[i].pTableQInfo->tid; int32_t tid = pTableDataInfo[i].pTableQInfo->tid;
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tid); SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tid);
if (list.size > 0 && pTableDataInfo[i].pTableQInfo->windowResInfo.size > 0) { if (list.size > 0 && pTableDataInfo[i].pTableQInfo->windowResInfo.size > 0) {
pTableList[numOfMeters] = &pTableDataInfo[i]; pTableList[numOfTables] = &pTableDataInfo[i];
numOfMeters += 1; numOfTables += 1;
} }
} }
if (numOfMeters == 0) { if (numOfTables == 0) {
tfree(posList); tfree(posList);
tfree(pTableList); tfree(pTableList);
...@@ -3050,14 +3046,13 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf ...@@ -3050,14 +3046,13 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf
SCompSupporter cs = {pTableList, posList, pQInfo}; SCompSupporter cs = {pTableList, posList, pQInfo};
SLoserTreeInfo *pTree = NULL; SLoserTreeInfo *pTree = NULL;
tLoserTreeCreate(&pTree, numOfMeters, &cs, tableResultComparFn); tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo));
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery); setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
int64_t lastTimestamp = -1; int64_t lastTimestamp = -1;
int64_t startt = taosGetTimestampMs(); int64_t startt = taosGetTimestampMs();
while (1) { while (1) {
...@@ -3078,7 +3073,7 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf ...@@ -3078,7 +3073,7 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf
cs.position[pos] = -1; cs.position[pos] = -1;
// all input sources are exhausted // all input sources are exhausted
if (--numOfMeters == 0) { if (--numOfTables == 0) {
break; break;
} }
} }
...@@ -3086,14 +3081,13 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf ...@@ -3086,14 +3081,13 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf
if (ts == lastTimestamp) { // merge with the last one if (ts == lastTimestamp) { // merge with the last one
doMerge(pRuntimeEnv, ts, pWindowRes, true); doMerge(pRuntimeEnv, ts, pWindowRes, true);
} else { // copy data to disk buffer } else { // copy data to disk buffer
assert(0); if (buffer[0]->numOfElems == pQuery->rec.capacity) {
// if (buffer[0]->numOfElems == pQuery->pointsToRead) { if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
// if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { return -1;
// return -1; }
// }
// resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
// } }
doMerge(pRuntimeEnv, ts, pWindowRes, false); doMerge(pRuntimeEnv, ts, pWindowRes, false);
buffer[0]->numOfElems += 1; buffer[0]->numOfElems += 1;
...@@ -3106,7 +3100,7 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf ...@@ -3106,7 +3100,7 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf
cs.position[pos] = -1; cs.position[pos] = -1;
// all input sources are exhausted // all input sources are exhausted
if (--numOfMeters == 0) { if (--numOfTables == 0) {
break; break;
} }
} }
...@@ -3117,8 +3111,8 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf ...@@ -3117,8 +3111,8 @@ UNUSED_FUNC int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInf
if (buffer[0]->numOfElems != 0) { // there are data in buffer if (buffer[0]->numOfElems != 0) { // there are data in buffer
if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
// dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), dError("QInfo:%p failed to flush data into temp file, abort query", pQInfo);
// pQInfo->extBufFile);
tfree(pTree); tfree(pTree);
tfree(pTableList); tfree(pTableList);
tfree(posList); tfree(posList);
...@@ -3264,7 +3258,7 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { ...@@ -3264,7 +3258,7 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
} }
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
...@@ -3563,7 +3557,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3563,7 +3557,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
if (pRuntimeEnv->pSecQueryHandle != NULL) { if (pRuntimeEnv->pSecQueryHandle != NULL) {
pRuntimeEnv->pSecQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols); pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, pQInfo->pTableList, cols);
} }
taosArrayDestroy(cols); taosArrayDestroy(cols);
...@@ -3694,8 +3688,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQ ...@@ -3694,8 +3688,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, STableQueryInfo *pTableQ
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->window.skey = pTableQueryInfo->win.skey; pQuery->window = pTableQueryInfo->win;
pQuery->window.ekey = pTableQueryInfo->win.ekey;
pQuery->lastKey = pTableQueryInfo->lastKey; pQuery->lastKey = pTableQueryInfo->lastKey;
assert(((pQuery->lastKey >= pQuery->window.skey) && QUERY_IS_ASC_QUERY(pQuery)) || assert(((pQuery->lastKey >= pQuery->window.skey) && QUERY_IS_ASC_QUERY(pQuery)) ||
...@@ -3802,7 +3795,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK ...@@ -3802,7 +3795,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK
pTableQueryInfo->lastKey = key; pTableQueryInfo->lastKey = key;
} else { } else {
pQuery->window.skey = key; pQuery->window.skey = key;
STimeWindow win = {.skey = key, pQuery->window.ekey}; STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey};
// for too small query range, no data in this interval. // for too small query range, no data in this interval.
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey < pQuery->window.skey)) || if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey < pQuery->window.skey)) ||
...@@ -4200,7 +4193,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) ...@@ -4200,7 +4193,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
taosArrayPush(cols, &pQuery->colList[i]); taosArrayPush(cols, &pQuery->colList[i]);
} }
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, pQInfo->pTableList, cols);
taosArrayDestroy(cols); taosArrayDestroy(cols);
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
...@@ -4301,7 +4294,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) ...@@ -4301,7 +4294,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, tSidSet *pSidset) { static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupList *pSidset) {
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) { if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
return false; return false;
} }
...@@ -4342,7 +4335,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { ...@@ -4342,7 +4335,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle; tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
...@@ -4500,7 +4493,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4500,7 +4493,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
#if 0 #if 0
// tSidSet *pTableIdList = pSupporter->pSidSet; // STableGroupList *pTableIdList = pSupporter->pSidSet;
int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode; int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode;
...@@ -4599,7 +4592,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4599,7 +4592,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
while (pSupporter->meterIdx < pSupporter->numOfMeters) { while (pSupporter->meterIdx < pSupporter->numOfTables) {
int32_t k = pSupporter->meterIdx; int32_t k = pSupporter->meterIdx;
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
...@@ -4743,7 +4736,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -4743,7 +4736,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
pQuery->pointsOffset = pQuery->pointsToRead; pQuery->pointsOffset = pQuery->pointsToRead;
dTrace( dTrace(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "QInfo %p vid:%d, numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64, "next skey:%" PRId64 ", offset:%" PRId64,
pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size, pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size,
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
...@@ -4754,7 +4747,7 @@ static void createTableDataInfo(SQInfo* pQInfo) { ...@@ -4754,7 +4747,7 @@ static void createTableDataInfo(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid // todo make sure the table are added the reference count to gauranteed that all involved tables are valid
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
if (pQInfo->pTableDataInfo == NULL) { if (pQInfo->pTableDataInfo == NULL) {
pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * numOfTables); pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * numOfTables);
...@@ -4766,7 +4759,7 @@ static void createTableDataInfo(SQInfo* pQInfo) { ...@@ -4766,7 +4759,7 @@ static void createTableDataInfo(SQInfo* pQInfo) {
int32_t groupId = 0; int32_t groupId = 0;
for (int32_t i = 0; i < numOfTables; ++i) { // load all meter meta info for (int32_t i = 0; i < numOfTables; ++i) { // load all meter meta info
STableId *id = taosArrayGet(pQInfo->pTableIdList, i); STableId *id = taosArrayGet(pQInfo->pTableList, i);
STableDataInfo *pInfo = &pQInfo->pTableDataInfo[i]; STableDataInfo *pInfo = &pQInfo->pTableDataInfo[i];
setTableDataInfo(pInfo, i, groupId); setTableDataInfo(pInfo, i, groupId);
...@@ -4777,7 +4770,7 @@ static void createTableDataInfo(SQInfo* pQInfo) { ...@@ -4777,7 +4770,7 @@ static void createTableDataInfo(SQInfo* pQInfo) {
static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) { static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo;
...@@ -4816,7 +4809,7 @@ static void doRestoreContext(SQInfo* pQInfo) { ...@@ -4816,7 +4809,7 @@ static void doRestoreContext(SQInfo* pQInfo) {
static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) { static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
size_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); size_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
...@@ -4898,7 +4891,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4898,7 +4891,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); // assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0);
if (mergeResultsToGroup(pQInfo) == TSDB_CODE_SUCCESS) { if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery); copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
...@@ -4910,7 +4903,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4910,7 +4903,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
} }
// handle the limitation of output buffer // handle the limitation of output buffer
dTrace("QInfo:%p points returned:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); dTrace("QInfo:%p points returned:%d, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
} }
/* /*
...@@ -5147,13 +5140,13 @@ static void tableQueryImpl(SQInfo* pQInfo) { ...@@ -5147,13 +5140,13 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// record the total elapsed time // record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st); pQInfo->elapsedTime += (taosGetTimestampUs() - st);
assert(taosArrayGetSize(pQInfo->pTableIdList) == 1); assert(taosArrayGetSize(pQInfo->pTableList) == 1);
/* check if query is killed or not */ /* check if query is killed or not */
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
} else { } else {
STableId* pTableId = taosArrayGet(pQInfo->pTableIdList, 0); STableId* pTableId = taosArrayGet(pQInfo->pTableList, 0);
dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
} }
...@@ -5182,7 +5175,7 @@ static void stableQueryImpl(SQInfo* pQInfo) { ...@@ -5182,7 +5175,7 @@ static void stableQueryImpl(SQInfo* pQInfo) {
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); // taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType);
if (pQuery->rec.rows == 0) { if (pQuery->rec.rows == 0) {
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList);
dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, numOfTables, pQuery->rec.total); dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, numOfTables, pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter); // vnodePrintQueryStatistics(pSupporter);
} }
...@@ -5276,7 +5269,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** p ...@@ -5276,7 +5269,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** p
* @return * @return
*/ */
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
char** tagCond) { char** tagCond, SColIndex** groupbyCols) {
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
...@@ -5399,17 +5392,26 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5399,17 +5392,26 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList); pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList);
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
// if (pQueryMsg->numOfGroupCols > 0) { *groupbyCols = malloc(pQueryMsg->numOfGroupCols*sizeof(SColIndex));
// pQueryMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryMsg->numOfTagsCols]);
// } else { for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
// pQueryMsg->groupbyTagIds = 0; (*groupbyCols)[i].colId = *(int16_t*) pMsg;
// } pMsg += sizeof((*groupbyCols)[i].colId);
(*groupbyCols)[i].colIndex = *(int16_t*) pMsg;
pMsg += sizeof((*groupbyCols)[i].colIndex);
(*groupbyCols)[i].flag = *(int16_t*) pMsg;
pMsg += sizeof((*groupbyCols)[i].flag);
memcpy((*groupbyCols)[i].name, pMsg, tListLen(groupbyCols[i]->name));
pMsg += tListLen((*groupbyCols)[i].name);
}
pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx);
pQueryMsg->orderType = htons(pQueryMsg->orderType); pQueryMsg->orderType = htons(pQueryMsg->orderType);
pMsg += sizeof(SColIndex) * pQueryMsg->numOfGroupCols; pMsg += sizeof(SColIndex) * pQueryMsg->numOfGroupCols;
} else {
pQueryMsg->groupbyTagIds = 0;
} }
pQueryMsg->interpoType = htons(pQueryMsg->interpoType); pQueryMsg->interpoType = htons(pQueryMsg->interpoType);
...@@ -5570,26 +5572,23 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct ...@@ -5570,26 +5572,23 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t *code) { static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex* pColIndex, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) { if (pQueryMsg->numOfGroupCols == 0) {
return NULL; return NULL;
} }
// using group by tag columns // using group by tag columns
SSqlGroupbyExpr *pGroupbyExpr = SSqlGroupbyExpr *pGroupbyExpr = (SSqlGroupbyExpr *)calloc(1, sizeof(SSqlGroupbyExpr));
(SSqlGroupbyExpr *)malloc(sizeof(SSqlGroupbyExpr) + pQueryMsg->numOfGroupCols * sizeof(SColIndex));
if (pGroupbyExpr == NULL) { if (pGroupbyExpr == NULL) {
*code = TSDB_CODE_SERV_OUT_OF_MEMORY; *code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL; return NULL;
} }
SColIndex *pGroupbyColInfo = (SColIndex *)pQueryMsg->groupbyTagIds;
pGroupbyExpr->numOfGroupCols = pQueryMsg->numOfGroupCols; pGroupbyExpr->numOfGroupCols = pQueryMsg->numOfGroupCols;
pGroupbyExpr->orderType = pQueryMsg->orderType; pGroupbyExpr->orderType = pQueryMsg->orderType;
pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx; pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx;
memcpy(pGroupbyExpr->columnInfo, pGroupbyColInfo, sizeof(SColIndex) * pGroupbyExpr->numOfGroupCols); pGroupbyExpr->columnInfo = pColIndex;
return pGroupbyExpr; return pGroupbyExpr;
} }
...@@ -5711,7 +5710,7 @@ static void doUpdateExprColumnIndex(SQuery* pQuery) { ...@@ -5711,7 +5710,7 @@ static void doUpdateExprColumnIndex(SQuery* pQuery) {
} }
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
SArray *pTableIdList) { SArray *pTableList) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
goto _clean_pQInfo_memory; goto _clean_pQInfo_memory;
...@@ -5809,7 +5808,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5809,7 +5808,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// to make sure third party won't overwrite this structure // to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo; pQInfo->signature = pQInfo;
pQInfo->pTableIdList = pTableIdList; pQInfo->pTableList = pTableList;
pQuery->pos = -1; pQuery->pos = -1;
...@@ -5920,7 +5919,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5920,7 +5919,7 @@ static void freeQInfo(SQInfo *pQInfo) {
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
if (pQInfo->pTableDataInfo != NULL) { if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList); // size_t num = taosHashGetSize(pQInfo->pTableList);
for (int32_t j = 0; j < 0; ++j) { for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
} }
...@@ -5959,7 +5958,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5959,7 +5958,7 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->pGroupbyExpr); tfree(pQuery->pGroupbyExpr);
tfree(pQuery); tfree(pQuery);
taosArrayDestroy(pQInfo->pTableIdList); taosArrayDestroy(pQInfo->pTableList);
dTrace("QInfo:%p QInfo is freed", pQInfo); dTrace("QInfo:%p QInfo is freed", pQInfo);
...@@ -6032,7 +6031,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) ...@@ -6032,7 +6031,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
char* tagCond = NULL; char* tagCond = NULL;
SArray *pTableIdList = NULL; SArray *pTableIdList = NULL;
SSqlFuncExprMsg** pExprMsg = NULL; SSqlFuncExprMsg** pExprMsg = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) { SColIndex* pGroupColIndex = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -6054,32 +6055,35 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) ...@@ -6054,32 +6055,35 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo)
goto _query_over; goto _query_over;
} }
SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, &code); SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code);
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _query_over; goto _query_over;
} }
bool isSTableQuery = false; bool isSTableQuery = false;
SArray* res = NULL; SArray* pGroupList = NULL;
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
isSTableQuery = true; isSTableQuery = true;
STableId* id = taosArrayGet(pTableIdList, 0); STableId* id = taosArrayGet(pTableIdList, 0);
id->uid = -1; id->uid = -1; //todo fix me
res = taosArrayInit(8, sizeof(STableId));
/*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, res); /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &pGroupList, pGroupColIndex, pQueryMsg->numOfGroupCols);
if (taosArrayGetSize(res) == 0) { // no qualified table in stable query in this vnode if (taosArrayGetSize(pGroupList) == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto _query_over; goto _query_over;
} }
} else { } else {
assert(taosArrayGetSize(pTableIdList) == 1); assert(taosArrayGetSize(pTableIdList) == 1);
res = pTableIdList;
STableId* id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &pGroupList)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
} }
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, res); (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pGroupList);
if ((*pQInfo) == NULL) { if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY; code = TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TALGO_H
#define TDENGINE_TALGO_H
#ifdef __cplusplus
extern "C" {
#endif
typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void *param);
void tqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TALGO_H
...@@ -159,7 +159,7 @@ void *tSkipListDestroy(SSkipList *pSkipList); ...@@ -159,7 +159,7 @@ void *tSkipListDestroy(SSkipList *pSkipList);
* @param level * @param level
* @param headSize * @param headSize
*/ */
void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize); void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize);
/** /**
* put the skip list node into the skip list. * put the skip list node into the skip list.
......
...@@ -461,9 +461,7 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) { ...@@ -461,9 +461,7 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
pNode->next = NULL; pNode->next = NULL;
pNode->prev = NULL; pNode->prev = NULL;
pTrace("key:%s %p remove from hash table", pNode->key, pNode);
tfree(pNode); tfree(pNode);
__unlock(pHashObj->lock); __unlock(pHashObj->lock);
} }
......
#include "os.h"
#include "tutil.h"
#include "talgo.h"
#define doswap(__left, __right, __size, __buf) do {\
memcpy((__buf), (__left), (__size));\
memcpy((__left), (__right),(__size));\
memcpy((__right), (__buf), (__size));\
} while (0);
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
static void median(void *src, size_t size, size_t s, size_t e, const void *param, __ext_compar_fn_t comparFn, void* buf) {
int32_t mid = ((e - s) >> 1u) + s;
if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) == 1) {
doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf);
}
if (comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, e), param) == 1) {
doswap(elePtrAt(src, size, mid), elePtrAt(src, size, s), size, buf);
doswap(elePtrAt(src, size, mid), elePtrAt(src, size, e), size, buf);
} else if (comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) == 1) {
doswap(elePtrAt(src, size, s), elePtrAt(src, size, e), size, buf);
}
assert(comparFn(elePtrAt(src, size, mid), elePtrAt(src, size, s), param) <= 0 && comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param) <= 0);
#ifdef _DEBUG_VIEW
tTagsPrints(src[s], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx);
tTagsPrints(src[mid], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx);
tTagsPrints(src[e], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx);
#endif
}
static void tInsertSort(void *src, size_t size, int32_t s, int32_t e, const void *param, __ext_compar_fn_t comparFn,
void* buf) {
for (int32_t i = s + 1; i <= e; ++i) {
for (int32_t j = i; j > s; --j) {
if (comparFn(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), param) == -1) {
doswap(elePtrAt(src, size, j), elePtrAt(src, size, j - 1), size, buf);
} else {
break;
}
}
}
}
void tqsortImpl(void *src, int32_t start, int32_t end, size_t size, const void *param, __ext_compar_fn_t comparFn,
void* buf) {
// short array sort, incur another sort procedure instead of quick sort process
const int32_t THRESHOLD_SIZE = 6;
if (end - start + 1 <= THRESHOLD_SIZE) {
tInsertSort(src, size, start, end, param, comparFn, buf);
return;
}
median(src, size, start, end, param, comparFn, buf);
int32_t s = start, e = end;
int32_t endRightS = end, startLeftS = start;
while (s < e) {
while (e > s) {
int32_t ret = comparFn(elePtrAt(src, size, e), elePtrAt(src, size, s), param);
if (ret < 0) {
break;
}
//move the data that equals to pivotal value to the right end of the list
if (ret == 0 && e != endRightS) {
doswap(elePtrAt(src, size, e), elePtrAt(src, size, endRightS), size, buf);
endRightS--;
}
e--;
}
if (e != s) {
doswap(elePtrAt(src, size, e), elePtrAt(src, size, s), size, buf);
}
while (s < e) {
int32_t ret = comparFn(elePtrAt(src, size, s), elePtrAt(src, size, e), param);
if (ret > 0) {
break;
}
if (ret == 0 && s != startLeftS) {
doswap(elePtrAt(src, size, s), elePtrAt(src, size, startLeftS), size, buf);
startLeftS++;
}
s++;
}
if (e != s) {
doswap(elePtrAt(src, size, s), elePtrAt(src, size, e), size, buf);
}
}
int32_t rightPartStart = e + 1;
if (endRightS != end && e < end) {
int32_t left = rightPartStart;
int32_t right = end;
while (right > endRightS && left <= endRightS) {
doswap(elePtrAt(src, size, left), elePtrAt(src, size, right), size, buf);
left++;
right--;
}
rightPartStart += (end - endRightS);
}
int32_t leftPartEnd = e - 1;
if (startLeftS != end && s > start) {
int32_t left = start;
int32_t right = leftPartEnd;
while (left < startLeftS && right >= startLeftS) {
doswap(elePtrAt(src, size, left), elePtrAt(src, size, right), size, buf);
left++;
right--;
}
leftPartEnd -= (startLeftS - start);
}
if (leftPartEnd > start) {
tqsortImpl(src, size, start, leftPartEnd, param, comparFn, buf);
}
if (rightPartStart < end) {
tqsortImpl(src, size, rightPartStart, end, param, comparFn, buf);
}
}
void tqsort(void *src, size_t numOfElem, size_t size, const void* param, __ext_compar_fn_t comparFn) {
char *buf = calloc(1, size); // prepare the swap buffer
tqsortImpl(src, 0, numOfElem - 1, size, param, comparFn, buf);
tfree(buf);
}
void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) {
// TODO: need to check the correctness of this function
int l = 0;
int r = nmemb;
int idx = 0;
int comparison;
if (flags == TD_EQ) {
return bsearch(key, base, nmemb, size, compar);
} else if (flags == TD_GE) {
if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0);
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL;
while (l < r) {
idx = (l + r) / 2;
comparison = (*compar)(key, elePtrAt(base, size, idx));
if (comparison < 0) {
r = idx;
} else if (comparison > 0) {
l = idx + 1;
} else {
return elePtrAt(base, size, idx);
}
}
if ((*compar)(key, elePtrAt(base, size, idx)) < 0) {
return elePtrAt(base, size, idx);
} else {
if (idx + 1 > nmemb - 1) {
return NULL;
} else {
return elePtrAt(base, size, idx + 1);
}
}
} else if (flags == TD_LE) {
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1);
if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL;
while (l < r) {
idx = (l + r) / 2;
comparison = (*compar)(key, elePtrAt(base, size, idx));
if (comparison < 0) {
r = idx;
} else if (comparison > 0) {
l = idx + 1;
} else {
return elePtrAt(base, size, idx);
}
}
if ((*compar)(key, elePtrAt(base, size, idx)) > 0) {
return elePtrAt(base, size, idx);
} else {
if (idx == 0) {
return NULL;
} else {
return elePtrAt(base, size, idx - 1);
}
}
} else {
assert(0);
return NULL;
}
return NULL;
}
...@@ -190,7 +190,7 @@ void *tSkipListDestroy(SSkipList *pSkipList) { ...@@ -190,7 +190,7 @@ void *tSkipListDestroy(SSkipList *pSkipList) {
return NULL; return NULL;
} }
void tSkipListRandNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) { void tSkipListNewNodeInfo(SSkipList *pSkipList, int32_t *level, int32_t *headSize) {
if (pSkipList == NULL) { if (pSkipList == NULL) {
return; return;
} }
......
...@@ -618,72 +618,3 @@ char *taosCharsetReplace(char *charsetstr) { ...@@ -618,72 +618,3 @@ char *taosCharsetReplace(char *charsetstr) {
return strdup(charsetstr); return strdup(charsetstr);
} }
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) {
// TODO: need to check the correctness of this function
int l = 0;
int r = nmemb;
int idx = 0;
int comparison;
if (flags == TD_EQ) {
return bsearch(key, base, nmemb, size, compar);
} else if (flags == TD_GE) {
if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0);
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL;
while (l < r) {
idx = (l + r) / 2;
comparison = (*compar)(key, elePtrAt(base, size, idx));
if (comparison < 0) {
r = idx;
} else if (comparison > 0) {
l = idx + 1;
} else {
return elePtrAt(base, size, idx);
}
}
if ((*compar)(key, elePtrAt(base, size, idx)) < 0) {
return elePtrAt(base, size, idx);
} else {
if (idx + 1 > nmemb - 1) {
return NULL;
} else {
return elePtrAt(base, size, idx + 1);
}
}
} else if (flags == TD_LE) {
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1);
if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL;
while (l < r) {
idx = (l + r) / 2;
comparison = (*compar)(key, elePtrAt(base, size, idx));
if (comparison < 0) {
r = idx;
} else if (comparison > 0) {
l = idx + 1;
} else {
return elePtrAt(base, size, idx);
}
}
if ((*compar)(key, elePtrAt(base, size, idx)) > 0) {
return elePtrAt(base, size, idx);
} else {
if (idx == 0) {
return NULL;
} else {
return elePtrAt(base, size, idx - 1);
}
}
} else {
assert(0);
return NULL;
}
return NULL;
}
...@@ -28,7 +28,7 @@ void doubleSkipListTest() { ...@@ -28,7 +28,7 @@ void doubleSkipListTest() {
int32_t level = 0; int32_t level = 0;
int32_t size = 0; int32_t size = 0;
tSkipListRandNodeInfo(pSkipList, &level, &size); tSkipListNewNodeInfo(pSkipList, &level, &size);
auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2); auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2);
d->level = level; d->level = level;
...@@ -81,7 +81,7 @@ void randKeyTest() { ...@@ -81,7 +81,7 @@ void randKeyTest() {
int32_t level = 0; int32_t level = 0;
int32_t s = 0; int32_t s = 0;
tSkipListRandNodeInfo(pSkipList, &level, &s); tSkipListNewNodeInfo(pSkipList, &level, &s);
auto d = (SSkipListNode*)calloc(1, s + sizeof(int32_t) * 2); auto d = (SSkipListNode*)calloc(1, s + sizeof(int32_t) * 2);
d->level = level; d->level = level;
...@@ -112,7 +112,7 @@ void stringKeySkiplistTest() { ...@@ -112,7 +112,7 @@ void stringKeySkiplistTest() {
int32_t level = 0; int32_t level = 0;
int32_t headsize = 0; int32_t headsize = 0;
tSkipListRandNodeInfo(pSkipList, &level, &headsize); tSkipListNewNodeInfo(pSkipList, &level, &headsize);
auto pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); auto pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double));
pNode->level = level; pNode->level = level;
...@@ -124,7 +124,7 @@ void stringKeySkiplistTest() { ...@@ -124,7 +124,7 @@ void stringKeySkiplistTest() {
tSkipListPut(pSkipList, pNode); tSkipListPut(pSkipList, pNode);
tSkipListRandNodeInfo(pSkipList, &level, &headsize); tSkipListNewNodeInfo(pSkipList, &level, &headsize);
pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double)); pNode = (SSkipListNode*)calloc(1, headsize + max_key_size + sizeof(double));
pNode->level = level; pNode->level = level;
...@@ -164,7 +164,7 @@ void stringKeySkiplistTest() { ...@@ -164,7 +164,7 @@ void stringKeySkiplistTest() {
int32_t total = 10000; int32_t total = 10000;
for (int32_t i = 0; i < total; ++i) { for (int32_t i = 0; i < total; ++i) {
int32_t n = sprintf(k, "abc_%d_%d", i, i); int32_t n = sprintf(k, "abc_%d_%d", i, i);
tSkipListRandNodeInfo(pSkipList, &level, &headsize); tSkipListNewNodeInfo(pSkipList, &level, &headsize);
auto pNode = (SSkipListNode*)calloc(1, headsize + 20 + sizeof(double)); auto pNode = (SSkipListNode*)calloc(1, headsize + 20 + sizeof(double));
pNode->level = level; pNode->level = level;
...@@ -222,7 +222,7 @@ void skiplistPerformanceTest() { ...@@ -222,7 +222,7 @@ void skiplistPerformanceTest() {
char* p = total; char* p = total;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
tSkipListRandNodeInfo(pSkipList, &level, &headsize); tSkipListNewNodeInfo(pSkipList, &level, &headsize);
SSkipListNode* d = (SSkipListNode*)p; SSkipListNode* d = (SSkipListNode*)p;
p += headsize + sizeof(double) * 2; p += headsize + sizeof(double) * 2;
......
...@@ -181,8 +181,10 @@ int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg); ...@@ -181,8 +181,10 @@ int32_t tsdbInsertData(tsdb_repo_t *pRepo, SSubmitMsg *pMsg);
typedef void* tsdb_query_handle_t; // Use void to hide implementation details typedef void* tsdb_query_handle_t; // Use void to hide implementation details
// typedef struct { typedef struct STableGroupList { // qualified table object list in group
// } SColumnFilterInfo; SArray* pGroupList;
int32_t numOfTables;
} STableGroupList;
// query condition to build vnode iterator // query condition to build vnode iterator
typedef struct STsdbQueryCond { typedef struct STsdbQueryCond {
...@@ -233,7 +235,7 @@ typedef void *tsdbpos_t; ...@@ -233,7 +235,7 @@ typedef void *tsdbpos_t;
* @param pTableList table sid list * @param pTableList table sid list
* @return * @return
*/ */
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo);
/** /**
* move to next block * move to next block
...@@ -335,7 +337,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); ...@@ -335,7 +337,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
* @param pTagCond. tag query condition * @param pTagCond. tag query condition
* *
*/ */
int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char *pTagCond, size_t len, SArray* list); int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray** pGroupList,
SColIndex* pColIndex, int32_t numOfCols);
int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, SArray** pGroupList);
/** /**
* clean up the query handle * clean up the query handle
......
...@@ -100,6 +100,7 @@ typedef struct { ...@@ -100,6 +100,7 @@ typedef struct {
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables); STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta); int32_t tsdbFreeMeta(STsdbMeta *pMeta);
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
// ---- Operation on STable // ---- Operation on STable
#define TSDB_TABLE_ID(pTable) ((pTable)->tableId) #define TSDB_TABLE_ID(pTable) ((pTable)->tableId)
......
...@@ -704,7 +704,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -704,7 +704,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
pTable->mem->keyLast = 0; pTable->mem->keyLast = 0;
} }
tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize); tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize);
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
// printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints); // printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints);
......
...@@ -105,8 +105,9 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { ...@@ -105,8 +105,9 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
if (pTable == NULL) return -1; if (pTable == NULL) return -1;
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
pTable->pIndex = STColumn* pColSchema = schemaColAt(pTable->tagSchema, 0);
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, 0, getTagIndexKey); pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes,
1, 0, 0, getTagIndexKey);
} }
tsdbAddTableToMeta(pMeta, pTable, false); tsdbAddTableToMeta(pMeta, pTable, false);
...@@ -201,6 +202,18 @@ STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) { ...@@ -201,6 +202,18 @@ STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
} }
} }
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
if (pTable->type == TSDB_SUPER_TABLE) {
return pTable->tagSchema;
} else if (pTable->type == TSDB_CHILD_TABLE) {
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
if (pSuper == NULL) return NULL;
return pSuper->tagSchema;
} else {
return NULL;
}
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
if (tsdbCheckTableCfg(pCfg) < 0) return -1; if (tsdbCheckTableCfg(pCfg) < 0) return -1;
...@@ -222,8 +235,11 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -222,8 +235,11 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
super->schema = tdDupSchema(pCfg->schema); super->schema = tdDupSchema(pCfg->schema);
super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagSchema = tdDupSchema(pCfg->tagSchema);
super->tagVal = tdDataRowDup(pCfg->tagValues); super->tagVal = tdDataRowDup(pCfg->tagValues);
super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
0, 0, getTagIndexKey); // Allow duplicate key, no lock // index the first tag column
STColumn* pColSchema = schemaColAt(super->tagSchema, 0);
super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes,
1, 0, 0, getTagIndexKey); // Allow duplicate key, no lock
if (super->pIndex == NULL) { if (super->pIndex == NULL) {
tdFreeSchema(super->schema); tdFreeSchema(super->schema);
...@@ -411,11 +427,11 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -411,11 +427,11 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
int32_t level = 0; int32_t level = 0;
int32_t headSize = 0; int32_t headSize = 0;
// first tag column tSkipListNewNodeInfo(pSTable->pIndex, &level, &headSize);
STColumn* s = schemaColAt(pSTable->tagSchema, 0);
tSkipListRandNodeInfo(pSTable->pIndex, &level, &headSize); // NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the
SSkipListNode* pNode = calloc(1, headSize + s->bytes + POINTER_BYTES); // actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function
SSkipListNode* pNode = calloc(1, headSize + POINTER_BYTES);
pNode->level = level; pNode->level = level;
SSkipList* list = pSTable->pIndex; SSkipList* list = pSTable->pIndex;
......
...@@ -15,10 +15,12 @@ ...@@ -15,10 +15,12 @@
#include "os.h" #include "os.h"
#include "talgo.h"
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
#include "../../../query/inc/qast.h" #include "../../../query/inc/qast.h"
#include "../../../query/inc/qextbuffer.h"
#include "../../../query/inc/tlosertree.h" #include "../../../query/inc/tlosertree.h"
#include "../../../query/inc/tsqlfunction.h" #include "../../../query/inc/tsqlfunction.h"
#include "tsdb.h" #include "tsdb.h"
...@@ -141,9 +143,8 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { ...@@ -141,9 +143,8 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileListIndex = -1; pCompBlockLoadInfo->fileListIndex = -1;
} }
tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, SArray* idList, SArray* pColumnInfo) { tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, SArray* groupList, SArray* pColumnInfo) {
// todo 1. filter not exist table // todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query // todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
...@@ -156,26 +157,26 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond ...@@ -156,26 +157,26 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
pQueryHandle->isFirstSlot = true; pQueryHandle->isFirstSlot = true;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
size_t size = taosArrayGetSize(idList); size_t size = taosArrayGetSize(groupList);
assert(size >= 1); assert(size >= 1);
pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo)); pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo));
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableId id = *(STableId*) taosArrayGet(idList, i); SArray* group = *(SArray**)taosArrayGet(groupList, i);
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid);
if (pTable == NULL) {
dError("%p failed to get table, error uid:%" PRIu64, pQueryHandle, id.uid);
continue;
}
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = id,
.pTableObj = pTable,
};
taosArrayPush(pQueryHandle->pTableCheckInfo, &info); size_t gsize = taosArrayGetSize(group);
for (int32_t j = 0; j < gsize; ++j) {
STable* pTable = *(STable**)taosArrayGet(group, j);
assert(pTable != NULL);
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = pTable->tableId,
.pTableObj = pTable,
};
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
}
} }
dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); dTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
...@@ -208,7 +209,8 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond ...@@ -208,7 +209,8 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
} }
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
pHandle->cur.fid = -1; pHandle->cur.fid = -1;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
...@@ -312,7 +314,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -312,7 +314,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid]; SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
assert(0); continue;//no data blocks in the file belongs to pCheckInfo->pTable
} else { } else {
if (pCheckInfo->compSize < compIndex->len) { if (pCheckInfo->compSize < compIndex->len) {
assert(compIndex->len > 0); assert(compIndex->len > 0);
...@@ -488,61 +490,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -488,61 +490,6 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
//bool moveToNextBlock(STsdbQueryHandle* pQueryHandle, int32_t step) {
// SQueryFilePos* cur = &pQueryHandle->cur;
//
// if (pQueryHandle->cur.fid >= 0) {
// /*
// * 1. ascending order. The last data block of data file
// * 2. descending order. The first block of file
// */
// STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
// int32_t tid = pCheckInfo->tableId.tid;
//
// if ((step == QUERY_ASC_FORWARD_STEP &&
// (pQueryHandle->cur.slot == pQueryHandle->compIndex[tid].numOfSuperBlocks - 1)) ||
// (step == QUERY_DESC_FORWARD_STEP && (pQueryHandle->cur.slot == 0))) {
// // temporarily keep the position value, in case of no data qualified when move forwards(backwards)
// // SQueryFilePos save = pQueryHandle->cur;
// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
//
// int32_t fid = -1;
// int32_t numOfBlocks = 0;
//
// if (pQueryHandle->pFileGroup != NULL) {
// if ((fid = getFileCompInfo(pQueryHandle, &numOfBlocks, 1)) < 0) {
// } else {
// cur->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->numOfBlocks - 1;
// cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->pBlock[cur->slot].numOfPoints - 1;
//
// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
// cur->fid = pQueryHandle->pFileGroup->fileId;
// assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
//
// if (pBlock->keyFirst > pQueryHandle->window.ekey) { // done
// return false;
// }
//
// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
// }
// } else { // check data in cache
// pQueryHandle->cur.fid = -1;
// return hasMoreDataInCache(pQueryHandle);
// }
// } else { // next block in the same file
// cur->slot += step;
//
// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
// cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1;
// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
// }
// } else { // data in cache
// return hasMoreDataInCache(pQueryHandle);
// }
//
// return false;
//}
static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1; int firstPos, lastPos, midPos = -1;
int numOfPoints; int numOfPoints;
...@@ -732,71 +679,6 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { ...@@ -732,71 +679,6 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
return midPos; return midPos;
} }
//static bool getQualifiedDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t type) {
// STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
// int32_t fid = getFileIdFromKey(pCheckInfo->lastKey);
//
// tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD);
// tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
//
// SQueryFilePos* cur = &pQueryHandle->cur;
//
// int32_t tid = pCheckInfo->tableId.tid;
// int32_t numOfBlocks = 0;
//
// while (pQueryHandle->pFileGroup != NULL) {
// if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) {
// break;
// }
//
// assert(pCheckInfo->numOfBlocks >= 0);
//
// // no data block in current file, try next
// if (pCheckInfo->numOfBlocks > 0) {
// cur->fid = pQueryHandle->pFileGroup->fileId;
// break;
// }
//
// dTrace("%p no data block in file, fid:%d, tid:%d, try next, %p", pQueryHandle, pQueryHandle->pFileGroup->fileId,
// tid, pQueryHandle->qinfo);
//
// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
// }
//
// if (pCheckInfo->numOfBlocks == 0) {
// return false;
// }
//
// cur->slot = 0; // always start from the first slot
// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
//}
//static UNUSED_FUNC bool hasMoreDataForSingleTable(STsdbQueryHandle* pHandle) {
// assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
//
// STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
// STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
//
// if (!pCheckInfo->checkFirstFileBlock) {
// pCheckInfo->checkFirstFileBlock = true;
//
// if (pFileHandle != NULL) {
// bool found = getQualifiedDataBlock(pHandle, pCheckInfo, 1);
// if (found) {
// return true;
// }
// }
//
// // no data in file, try cache
// pHandle->cur.fid = -1;
// return hasMoreDataInCache(pHandle);
// } else { // move to next data block in file or in cache
// return moveToNextBlock(pHandle, 1);
// }
//}
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) { static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
tfree(pSupporter->numOfBlocksPerMeter); tfree(pSupporter->numOfBlocksPerMeter);
tfree(pSupporter->blockIndexArray); tfree(pSupporter->blockIndexArray);
...@@ -862,23 +744,26 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -862,23 +744,26 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
} }
int32_t cnt = 0; int32_t cnt = 0;
int32_t numOfQualMeters = 0; int32_t numOfQualTables = 0;
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j); STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j);
if (pTableCheck->numOfBlocks <= 0) {
continue;
}
SCompBlock* pBlock = pTableCheck->pCompInfo->blocks; SCompBlock* pBlock = pTableCheck->pCompInfo->blocks;
sup.numOfBlocksPerMeter[numOfQualMeters] = pTableCheck->numOfBlocks; sup.numOfBlocksPerMeter[numOfQualTables] = pTableCheck->numOfBlocks;
char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
if (buf == NULL) { if (buf == NULL) {
cleanBlockOrderSupporter(&sup, numOfQualMeters); cleanBlockOrderSupporter(&sup, numOfQualTables);
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
sup.pDataBlockInfo[numOfQualMeters] = (STableBlockInfo*)buf; sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf;
for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) { for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) {
STableBlockInfo* pBlockInfoEx = &sup.pDataBlockInfo[numOfQualMeters][k]; STableBlockInfo* pBlockInfoEx = &sup.pDataBlockInfo[numOfQualTables][k];
pBlockInfoEx->pBlock.compBlock = &pBlock[k]; pBlockInfoEx->pBlock.compBlock = &pBlock[k];
pBlockInfoEx->pBlock.fields = NULL; pBlockInfoEx->pBlock.fields = NULL;
...@@ -889,13 +774,13 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -889,13 +774,13 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
cnt++; cnt++;
} }
numOfQualMeters++; numOfQualTables++;
} }
dTrace("%p create data blocks info struct completed", pQueryHandle); dTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables);
assert(cnt <= numOfBlocks && numOfQualMeters <= numOfTables); // the pMeterDataInfo[j]->numOfBlocks may be 0 assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pMeterDataInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualMeters; sup.numOfTables = numOfQualTables;
SLoserTreeInfo* pTree = NULL; SLoserTreeInfo* pTree = NULL;
uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
...@@ -1256,11 +1141,11 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) { ...@@ -1256,11 +1141,11 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex); SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
STable* t = *(STable**)SL_GET_NODE_DATA(pNode);
STable* t = *(STable**)SL_GET_NODE_DATA(pNode);
taosArrayPush(list, &t->tableId); taosArrayPush(list, t);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1348,6 +1233,132 @@ void filterPrepare(void* expr, void* param) { ...@@ -1348,6 +1233,132 @@ void filterPrepare(void* expr, void* param) {
tVariantTypeSetType(&pInfo->q, pInfo->sch.type); tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
} }
int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) {
switch (type) {
case TSDB_DATA_TYPE_INT: DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2));
case TSDB_DATA_TYPE_DOUBLE: DEFAULT_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2));
case TSDB_DATA_TYPE_FLOAT: DEFAULT_COMP(GET_FLOAT_VAL(f1), GET_FLOAT_VAL(f2));
case TSDB_DATA_TYPE_BIGINT: DEFAULT_COMP(GET_INT64_VAL(f1), GET_INT64_VAL(f2));
case TSDB_DATA_TYPE_SMALLINT: DEFAULT_COMP(GET_INT16_VAL(f1), GET_INT16_VAL(f2));
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2));
case TSDB_DATA_TYPE_NCHAR: {
int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE);
if (ret == 0) {
return ret;
}
return (ret < 0) ? -1 : 1;
}
default: {
int32_t ret = strncmp(f1, f2, (size_t)size);
if (ret == 0) {
return ret;
}
return (ret < 0) ? -1 : 1;
}
}
}
typedef struct STableGroupSupporter {
int32_t numOfCols;
SColIndex* pCols;
STSchema* pTagSchema;
} STableGroupSupporter;
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable *pTable1 = *(STable **) p1;
STable *pTable2 = *(STable **) p2;
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
int32_t colIndex = pColIndex->colIndex;
char * f1 = NULL;
char * f2 = NULL;
int32_t type = 0;
int32_t bytes = 0;
if (colIndex == -1) { // table name, todo fix me
// f1 = s1->tags;
// f2 = s2->tags;
type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN;
} else {
f1 = dataRowTuple(pTable1->tagVal);
f2 = dataRowTuple(pTable2->tagVal);
type = schemaColAt(pTableGroupSupp->pTagSchema, colIndex)->type;
bytes = schemaColAt(pTableGroupSupp->pTagSchema, colIndex)->bytes;
}
int32_t ret = doCompare(f1, f2, type, bytes);
if (ret == 0) {
continue;
} else {
return ret;
}
}
return 0;
}
void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
SArray* g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTables[0]);
for (int32_t i = 1; i < numOfTables; ++i) {
int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp);
assert(ret == 0 || ret == -1);
if (ret == 0) {
taosArrayPush(g, &pTables[i]);
} else {
taosArrayPush(pGroups, &g); // current group is ended, start a new group
g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTables[i]);
}
}
}
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
assert(pTableList != NULL && taosArrayGetSize(pTableList) > 0);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
size_t size = taosArrayGetSize(pTableList);
if (size == 0) {
pTrace("no qualified tables");
return pTableGroup;
}
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
taosArrayPush(pTableGroup, pTableList);
pTrace("all %d tables belong to one group", size);
#ifdef _DEBUG_VIEW
tSidSetDisplay(pTableGroup);
#endif
} else {
STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
pSupp->numOfCols = numOfOrderCols;
pSupp->pTagSchema = pTagSchema;
pSupp->pCols = pCols;
tqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList->pData, size, pSupp, tableGroupComparFn);
#ifdef _DEBUG_VIEW
tSidSetDisplay(pTableGroup);
#endif
tfree(pSupp);
}
return pTableGroup;
}
bool tSkipListNodeFilterCallback(const void* pNode, void* param) { bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param; tQueryInfo* pInfo = (tQueryInfo*)param;
...@@ -1419,13 +1430,29 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -1419,13 +1430,29 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray* res) { int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray** pGroupList,
if (pTagCond == NULL || len == 0) { // no condition, all tables created according to this stable are involved SColIndex* pColIndex, int32_t numOfCols) {
return getAllTableIdList(tsdb, uid, res);
}
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
assert(pSTable != NULL); if (pSTable == NULL) {
dError("failed to get stable, uid:%" PRIu64, uid);
return TSDB_CODE_INVALID_TABLE_ID;
}
SArray* res = taosArrayInit(8, POINTER_BYTES);
STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pSTable);
if (pTagCond == NULL || len == 0) { // no tags condition, all tables created according to this stable are involved
int32_t ret = getAllTableIdList(tsdb, uid, res);
if (ret != TSDB_CODE_SUCCESS) {
taosArrayDestroy(res);
return ret;
}
*pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
return ret;
}
tExprNode* pExprNode = NULL; tExprNode* pExprNode = NULL;
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
...@@ -1433,12 +1460,33 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size ...@@ -1433,12 +1460,33 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
// failed to build expression, no result, return immediately // failed to build expression, no result, return immediately
if ((ret = exprTreeFromBinary(pTagCond, len, &pExprNode) != TSDB_CODE_SUCCESS) || (pExprNode == NULL)) { if ((ret = exprTreeFromBinary(pTagCond, len, &pExprNode) != TSDB_CODE_SUCCESS) || (pExprNode == NULL)) {
dError("stable:%" PRIu64 ", failed to deserialize expression tree, error exists", uid); dError("stable:%" PRIu64 ", failed to deserialize expression tree, error exists", uid);
taosArrayDestroy(res);
return ret; return ret;
} }
return doQueryTableList(pSTable, res, pExprNode); doQueryTableList(pSTable, res, pExprNode);
*pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
return ret;
} }
int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, SArray** pGroupList) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
return TSDB_CODE_INVALID_TABLE_ID;
}
//todo assert table type, add the table ref count
*pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(group, &pTable);
taosArrayPush(*pGroupList, &group);
return TSDB_CODE_SUCCESS;
}
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
......
...@@ -75,17 +75,17 @@ int main(int argc, char *argv[]) { ...@@ -75,17 +75,17 @@ int main(int argc, char *argv[]) {
doQuery(taos, "create database if not exists test"); doQuery(taos, "create database if not exists test");
doQuery(taos, "use test"); doQuery(taos, "use test");
doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); // doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);");
doQuery(taos, "select * from tm0;"); doQuery(taos, "select sum(k),count(*) from m1 group by a");
taos_close(taos); taos_close(taos);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册