提交 006d59bf 编写于 作者: H Haojun Liao

[td-3299]

上级 ac9ae238
......@@ -200,6 +200,7 @@ SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
......
......@@ -238,6 +238,12 @@ typedef struct SQueryInfo {
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
} SQueryInfo;
typedef struct {
......
......@@ -14,7 +14,7 @@
*/
#include "tscLocalMerge.h"
//#include "tscSubquery.h"
#include "tscSubquery.h"
#include "os.h"
#include "texpr.h"
#include "tlosertree.h"
......@@ -96,9 +96,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
return TSDB_CODE_TSC_APP_ERROR;
}
size_t size = sizeof(SLocalMerger) + POINTER_BYTES * numOfFlush;
*pMerger = (SLocalMerger *) calloc(1, size);
*pMerger = (SLocalMerger *) calloc(1, sizeof(SLocalMerger));
if ((*pMerger) == NULL) {
tscError("0x%"PRIx64" failed to create local merge structure, out of memory", id);
......@@ -107,7 +105,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
}
(*pMerger)->pExtMemBuffer = pMemBuffer;
(*pMerger)->pLocalDataSrc = (SLocalDataSource **)&pMerger[1];
(*pMerger)->pLocalDataSrc = calloc(numOfFlush, POINTER_BYTES);
assert((*pMerger)->pLocalDataSrc != NULL);
(*pMerger)->numOfBuffer = numOfFlush;
......@@ -304,19 +302,22 @@ void tscDestroyLocalMerger(SLocalMerger* pLocalMerger) {
return;
}
if (pLocalMerger->pLoserTree) {
tfree(pLocalMerger->pLoserTree->param);
tfree(pLocalMerger->pLoserTree);
}
tscLocalReducerEnvDestroy(pLocalMerger->pExtMemBuffer, pLocalMerger->pDesc, pLocalMerger->numOfVnode);
for (int32_t i = 0; i < pLocalMerger->numOfBuffer; ++i) {
tfree(pLocalMerger->pLocalDataSrc[i]);
}
pLocalMerger->numOfBuffer = 0;
tscLocalReducerEnvDestroy(pLocalMerger->pExtMemBuffer, pLocalMerger->pDesc, pLocalMerger->numOfVnode);
pLocalMerger->numOfCompleted = 0;
if (pLocalMerger->pLoserTree) {
tfree(pLocalMerger->pLoserTree->param);
tfree(pLocalMerger->pLoserTree);
}
tfree(pLocalMerger->buf);
tfree(pLocalMerger->pLocalDataSrc);
free(pLocalMerger);
}
......@@ -786,11 +787,10 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *newRow =
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel,
pOneDataSrc->rowIdx, pIndex->colIndex);
char *newRow = COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel,
pOneDataSrc->rowIdx, pIndex->colIndex);
char * data = pInfo->prevRow[i];
char *data = pInfo->prevRow[i];
int32_t ret = columnValueAscendingComparator(data, newRow, pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
continue;
......@@ -809,9 +809,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *curCol =
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel,
pOneDataSrc->rowIdx, pIndex->colIndex);
char *curCol = COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel,
pOneDataSrc->rowIdx, pIndex->colIndex);
memcpy(pInfo->prevRow[i], curCol, pColInfo->info.bytes);
}
......@@ -956,7 +955,6 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pRes->info.rows > 0) {
STimeWindow* w = &pRes->info.window;
// TODO in case of desc order, swap it
w->skey = *(int64_t*)pInfoData->pData;
w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pRes->info.rows - 1));
......
......@@ -65,15 +65,11 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
}
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
assert(idx < subState->numOfSub);
assert(subState->states);
assert(idx < subState->numOfSub && subState->states != NULL);
tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state);
pthread_mutex_lock(&subState->mutex);
tscDebug("subquery:0x%"PRIx64",%d state set to %d", pSql->self, idx, state);
subState->states[idx] = state;
pthread_mutex_unlock(&subState->mutex);
}
......@@ -110,13 +106,9 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
pthread_mutex_lock(&subState->mutex);
bool done = allSubqueryDone(pParentSql);
if (done) {
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self,
pSql->self, idx);
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx);
pthread_mutex_unlock(&subState->mutex);
return false;
}
......@@ -125,9 +117,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
subState->states[idx] = 1;
done = allSubqueryDone(pParentSql);
pthread_mutex_unlock(&subState->mutex);
return done;
}
......@@ -701,8 +691,6 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
}
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
}
......
......@@ -829,7 +829,9 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
void tscFreeSqlResult(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
tscDestroyLocalMerger(pRes->pLocalMerger);
pRes->pLocalMerger = NULL;
tscDestroyResPointerInfo(pRes);
memset(&pSql->res, 0, sizeof(SSqlRes));
......@@ -853,7 +855,6 @@ void tscFreeSubobj(SSqlObj* pSql) {
}
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
}
......
#include <tscUtil.h>
#include "os.h"
#include "tsclient.h"
#include "qUtil.h"
#include "texpr.h"
#include "tsclient.h"
#define QNODE_PROJECT 1
#define QNODE_FILTER 2
#define QNODE_RELATION 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DIST 8
#define QNODE_SORT 9
#define QNODE_UNIONALL 10
#define QNODE_TIMEWINDOW 11
#define QNODE_PROJECT 1
#define QNODE_FILTER 2
#define QNODE_TABLESCAN 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNIONALL 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_FILL 13
typedef struct SQueryNodeBasicInfo {
int32_t type;
char *name;
} SQueryNodeBasicInfo;
typedef struct SQueryNode {
int32_t type; // the type of logic node
char *name; // the name of logic node
SQueryNodeBasicInfo info;
// char *name; // the name of logic node
// int32_t type; // the type of logic node
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SExprInfo *pExpr; // the query functions or sql aggregations
int32_t numOfOutput; // number of result columns, which is also the number of pExprs
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SExprInfo *pExpr; // the query functions or sql aggregations
int32_t numOfOutput; // number of result columns, which is also the number of pExprs
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
struct SQueryNode* prevNode;
struct SQueryNode* nextNode;
SArray *pPrevNodes;// upstream nodes
struct SQueryNode *nextNode;
} SQueryNode;
// TODO create the query plan
SQueryNode* qCreateQueryPlan(SQueryInfo* pQueryInfo) {
static SQueryNode* createQueryNode(int32_t type, const char* name, SQueryNode** prev, int32_t numOfPrev) {
SQueryNode* pNode = calloc(1, sizeof(SQueryNode));
pNode->info.type = type;
pNode->info.name = strdup(name);
pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < numOfPrev; ++i) {
taosArrayPush(pNode->pPrevNodes, &prev[i]);
}
return pNode;
}
static SQueryNode* doCreateQueryPlanForOneTable(SQueryInfo* pQueryInfo) {
SQueryNode* pNode = createQueryNode(QNODE_TABLESCAN, "", NULL, 0);
// check for filter
if (pQueryInfo->hasFilter) {
pNode = createQueryNode(QNODE_FILTER, "", &pNode, 1);
}
if (pQueryInfo->distinctTag) {
pNode = createQueryNode(QNODE_DISTINCT, "", &pNode, 0);
} else if (pQueryInfo->projectionQuery) {
pNode = createQueryNode(QNODE_PROJECT, "", &pNode, 1);
} else { // check for aggregation
if (pQueryInfo->interval.interval > 0) {
pNode = createQueryNode(QNODE_TIMEWINDOW, "", &pNode, 1);
} else if (pQueryInfo->groupbyColumn) {
pNode = createQueryNode(QNODE_GROUPBY, "", &pNode, 1);
} else if (pQueryInfo->sessionWindow.gap > 0) {
pNode = createQueryNode(QNODE_SESSIONWINDOW, "", &pNode, 1);
} else if (pQueryInfo->simpleAgg) {
pNode = createQueryNode(QNODE_AGGREGATE, "", &pNode, 1);
}
if (pQueryInfo->havingFieldNum > 0) {
pNode = createQueryNode(QNODE_FILTER, "", &pNode, 1);
}
if (pQueryInfo->arithmeticOnAgg) {
pNode = createQueryNode(QNODE_PROJECT, "", &pNode, 1);
}
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
pNode = createQueryNode(QNODE_FILL, "", &pNode, 1);
}
}
if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) {
pNode = createQueryNode(QNODE_LIMIT, "", &pNode, 1);
}
return pNode;
}
SArray* qCreateQueryPlan(SQueryInfo* pQueryInfo) {
// join and subquery
SArray* upstream = NULL;
if (pQueryInfo->pUpstream != NULL) { // subquery in the from clause
upstream = taosArrayInit(4, POINTER_BYTES);
size_t size = taosArrayGetSize(pQueryInfo->pUpstream);
for(int32_t i = 0; i < size; ++i) {
SQueryInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i);
SArray* p = qCreateQueryPlan(pq);
taosArrayPushBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p));
}
}
if (pQueryInfo->numOfTables > 1) { // it is a join query
// 1. separate the select clause according to table
int32_t tableIndex = 0;
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex];
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
SArray* exprList = taosArrayInit(4, POINTER_BYTES);
if (tscSqlExprCopy(exprList, pQueryInfo->exprList, uid, true) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
exit(-1);
}
SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn));
tscColumnListCopy(tableColumnList, pQueryInfo->colList, uid);
// 2.
SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo);
UNUSED(pNode);
} else { // only one table, normal query process
SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo);
UNUSED(pNode);
}
return NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册