提交 59abdffc 编写于 作者: H Haojun Liao

[td-1720]

上级 ab3133f8
......@@ -109,7 +109,6 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SArray* pDataBlockList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
......
......@@ -554,27 +554,40 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
numOfGroupByCols++;
}
int32_t *orderIdx = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t));
if (orderIdx == NULL) {
int32_t *orderColIndexList = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t));
if (orderColIndexList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (numOfGroupByCols > 0) {
int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
// tags value locate at the last columns
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
orderIdx[i] = startCols++;
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
// the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
orderColIndexList[i] = startCols++;
}
if (pQueryInfo->interval.interval != 0) {
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderColIndexList[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX; //TODO ???
}
} else { // it is the orderby ts asc/desc projection query for super table
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
orderColIndexList[0] = i;
}
}
if (pQueryInfo->interval.interval != 0) {
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
assert(pQueryInfo->order.orderColId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
}
}
*pOrderDesc = tOrderDesCreate(orderIdx, numOfGroupByCols, pModel, pQueryInfo->order.order);
taosTFree(orderIdx);
*pOrderDesc = tOrderDesCreate(orderColIndexList, numOfGroupByCols, pModel, pQueryInfo->order.order);
taosTFree(orderColIndexList);
if (*pOrderDesc == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -606,7 +619,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
return true;
}
if (orderInfo->pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (orderInfo->colIndex[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
/*
* super table interval query
* if the order columns is the primary timestamp, all result data belongs to one group
......@@ -620,7 +633,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
// only one row exists
int32_t index = orderInfo->pData[0];
int32_t index = orderInfo->colIndex[0];
int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
......@@ -661,7 +674,6 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema[i].bytes = pExpr->resBytes;
pSchema[i].type = (int8_t)pExpr->resType;
rlen += pExpr->resBytes;
}
......
......@@ -1347,6 +1347,32 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, (int8_t)pExpr->resType, pExpr->aliasName, pExpr);
}
static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
// primary timestamp column has been added already
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return;
}
}
SColumnIndex index = {0};
// set the constant column value always attached to first table.
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, PRIMARYKEY_TIMESTAMP_COL_INDEX);
// add the timestamp column into the output columns
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL);
SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols);
pSupInfo->visible = false;
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery) {
assert(pSelection != NULL && pCmd != NULL);
......@@ -1400,20 +1426,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
// there is only one user-defined column in the final result field, add the timestamp column.
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
SColumnIndex index = {0};
// set the constant column value always attached to first table.
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, clauseIndex, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, PRIMARYKEY_TIMESTAMP_COL_INDEX);
// add the timestamp column into the output columns
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL);
SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols);
pSupInfo->visible = false;
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
addPrimaryTsColIntoResult(pQueryInfo);
}
if (!functionCompatibleCheck(pQueryInfo, joinQuery)) {
......@@ -4482,6 +4495,11 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
} else {
pQueryInfo->order.order = pSortorder->a[0].sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
// orderby ts query on super table
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
addPrimaryTsColIntoResult(pQueryInfo);
}
}
}
......
......@@ -1393,9 +1393,10 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pSql->subState.numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
assert(pSql->subState.numOfSub > 0);
SSubqueryState *pState = &pSql->subState;
pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
if (ret != 0) {
......@@ -1405,24 +1406,24 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret;
}
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->subState.numOfSub);
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
if (pSql->pSubs == NULL) {
taosTFree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscQueueAsyncRes(pSql);
return ret;
}
pSql->subState.numOfRemain = pSql->subState.numOfSub;
pState->numOfRemain = pState->numOfSub;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pSql->subState.numOfSub; ++i) {
for (; i < pState->numOfSub; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
......@@ -1461,22 +1462,22 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
}
if (i < pSql->subState.numOfSub) {
if (i < pState->numOfSub) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
for(int32_t j = 0; j < pSql->subState.numOfSub; ++j) {
for(int32_t j = 0; j < pState->numOfSub; ++j) {
SSqlObj* pSub = pSql->pSubs[j];
SRetrieveSupport* pSupport = pSub->param;
......
......@@ -89,7 +89,7 @@ typedef struct SColumnModel {
typedef struct SColumnOrderInfo {
int32_t numOfCols;
int16_t pData[];
int16_t colIndex[];
} SColumnOrderInfo;
typedef struct tOrderDescriptor {
......
......@@ -343,8 +343,10 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
if (f1 == f2) {
return 0;
}
if (colIdx == 0 && tsOrder == TSDB_ORDER_DESC) { // primary column desc order
assert(colIdx == 0);
if (tsOrder == TSDB_ORDER_DESC) { // primary column desc order
return (f1 < f2) ? 1 : -1;
} else { // asc
return (f1 < f2) ? -1 : 1;
......@@ -435,7 +437,7 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderInfo.pData[i];
int32_t colIdx = pDescriptor->orderInfo.colIndex[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
......@@ -467,7 +469,7 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderInfo.pData[i];
int32_t colIdx = pDescriptor->orderInfo.colIndex[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
......@@ -557,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
int32_t midIdx = ((end - start) >> 1) + start;
#if defined(_DEBUG_VIEW)
int32_t f = pDescriptor->orderInfo.pData[0];
int32_t f = pDescriptor->orderInfo.colIndex[0];
char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f);
char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f);
char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f);
int32_t colIdx = pDescriptor->orderInfo.pData[0];
int32_t colIdx = pDescriptor->orderInfo.colIndex[0];
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx);
#endif
......@@ -591,7 +593,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
}
static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) {
int32_t colIdx = pDescriptor->orderInfo.pData[0];
int32_t colIdx = pDescriptor->orderInfo.colIndex[0];
for (int32_t i = 0; i < len; ++i) {
char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx);
......@@ -1075,7 +1077,7 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
desc->orderInfo.numOfCols = numOfOrderCols;
for (int32_t i = 0; i < numOfOrderCols; ++i) {
desc->orderInfo.pData[i] = orderColIdx[i];
desc->orderInfo.colIndex[i] = orderColIdx[i];
}
return desc;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册