未验证 提交 6fe3b30d 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7332 from taosdata/feature/master1

Feature/master1
......@@ -31,11 +31,12 @@ extern "C" {
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
......
......@@ -5486,14 +5486,19 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
if (isTopBottomQuery(pQueryInfo)) {
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column
pQueryInfo->order.orderColId = INT32_MIN;
} else { // in case of select tbname from super_table, the default order column can not be the primary ts column
pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro
}
/* for super table query, set default ascending order for group output */
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
}
if (pQueryInfo->distinct) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
}
int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) {
......@@ -5505,17 +5510,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pQueryInfo->distinct == true) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
pQueryInfo->order.orderColId = 0;
return TSDB_CODE_SUCCESS;
}
if (pSqlNode->pSortOrder == NULL) {
if (pQueryInfo->distinct || pSqlNode->pSortOrder == NULL) {
return TSDB_CODE_SUCCESS;
}
SArray* pSortorder = pSqlNode->pSortOrder;
char* pMsgBuf = tscGetErrorMsgPayload(pCmd);
SArray* pSortOrder = pSqlNode->pSortOrder;
/*
* for table query, there is only one or none order option is allowed, which is the
......@@ -5523,19 +5523,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
*
* for super table query, the order option must be less than 3.
*/
size_t size = taosArrayGetSize(pSortorder);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
size_t size = taosArrayGetSize(pSortOrder);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) {
if (size > 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0);
return invalidOperationMsg(pMsgBuf, msg0);
}
} else {
if (size > 2) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidOperationMsg(pMsgBuf, msg3);
}
}
// handle the first part of order by
tVariant* pVar = taosArrayGet(pSortorder, 0);
tVariant* pVar = taosArrayGet(pSortOrder, 0);
// e.g., order by 1 asc, return directly with out further check.
if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) {
......@@ -5547,7 +5547,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(pMsgBuf, msg1);
}
bool orderByTags = false;
......@@ -5559,7 +5559,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
// it is a tag column
if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(pMsgBuf, msg2);
}
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
if (relTagIndex == pColIndex->colIndex) {
......@@ -5580,13 +5580,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
orderByGroupbyCol = true;
}
}
if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidOperationMsg(pMsgBuf, msg3);
} else { // order by top/bottom result value column is not supported in case of interval query.
assert(!(orderByTags && orderByTS && orderByGroupbyCol));
}
size_t s = taosArrayGetSize(pSortorder);
size_t s = taosArrayGetSize(pSortOrder);
if (s == 1) {
if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
......@@ -5605,7 +5606,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(pMsgBuf, msg2);
}
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
......@@ -5623,9 +5624,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
addPrimaryTsColIntoResult(pQueryInfo, pCmd);
}
}
}
if (s == 2) {
} else {
tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
......@@ -5642,22 +5641,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
tVariant* pVar2 = &pItem->pVar;
SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz};
if (getColumnIndexByName(&cname, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(pMsgBuf, msg1);
}
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(pMsgBuf, msg2);
} else {
tVariantListItem* p1 = taosArrayGet(pSortorder, 1);
tVariantListItem* p1 = taosArrayGet(pSortOrder, 1);
pQueryInfo->order.order = p1->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
}
} else { // meter query
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsgBuf, msg1);
}
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) {
bool validOrder = false;
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
......@@ -5665,13 +5665,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
validOrder = (pColIndex->colIndex == index.columnIndex);
}
if (!validOrder) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(pMsgBuf, msg2);
}
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
}
if (isTopBottomQuery(pQueryInfo)) {
......@@ -5687,13 +5688,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(pMsgBuf, msg2);
}
validOrder = true;
}
if (!validOrder) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(pMsgBuf, msg2);
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
......@@ -5703,6 +5705,18 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return TSDB_CODE_SUCCESS;
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
} else {
// handle the temp table order by clause. You can order by any single column in case of the temp table, created by
// inner subquery.
assert(UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo) && taosArrayGetSize(pSqlNode->pSortOrder) == 1);
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsgBuf, msg1);
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
......@@ -8463,7 +8477,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg9 = "not support 3 level select";
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -8758,8 +8771,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo);
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
//pQueryInfo->globalMerge = tscIsTwoStageSTableQuery(pQueryInfo, 0);
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
......
......@@ -880,16 +880,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryAttr query = {{0}};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
query.vgId = pTableMeta->vgId;
SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
......
......@@ -1208,7 +1208,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols);
pOutput->precision = pSqlObjList[0]->res.precision;
SSchema* schema = NULL;
......
......@@ -79,7 +79,7 @@ int32_t tsCompressMsgSize = -1;
// client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_MAX_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN;
int8_t tsTscEnableRecordSql = 0;
// the maximum number of results for projection query on super table that are returned from
......
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
Subproject commit a44ec1ca493ad01b2bf825b6418f69e11f548206
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit ce5201014136503d34fecbd56494b67b4961056c
Subproject commit b62a26ecc164a310104df57691691b237e091c89
......@@ -333,6 +333,7 @@ enum OPERATOR_TYPE_E {
OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
};
typedef struct SOperatorInfo {
......@@ -417,7 +418,6 @@ typedef struct STableScanInfo {
int32_t *rowCellInfoOffset;
SExprInfo *pExpr;
SSDataBlock block;
bool loadExternalRows; // load external rows (prev & next rows)
int32_t numOfOutput;
int64_t elapsedTime;
......@@ -541,6 +541,13 @@ typedef struct SMultiwayMergeInfo {
SArray *udfInfo;
} SMultiwayMergeInfo;
// todo support the disk-based sort
typedef struct SOrderOperatorInfo {
int32_t colIndex;
int32_t order;
SSDataBlock *pDataBlock;
} SOrderOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
......@@ -570,6 +577,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -220,6 +220,8 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
void tOrderDescDestroy(tOrderDescriptor *pDesc);
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t srcStartRows,
int32_t numOfRowsToWrite, int32_t srcCapacity);
......
......@@ -224,6 +224,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
......@@ -2215,6 +2216,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
break;
}
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
......@@ -2231,10 +2233,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Filter: { // todo refactor
int32_t numOfFilterCols = 0;
// if (pQueryAttr->numOfFilterCols > 0) {
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
// pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols);
// } else {
if (pQueryAttr->stableQuery) {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
......@@ -2248,7 +2246,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
}
// }
break;
}
......@@ -2260,9 +2258,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_MultiwayMergeSort: {
bool groupMix = true;
if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
groupMix = false;
}
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
4096, merger, groupMix); // TODO hack it
break;
......@@ -2285,6 +2284,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
break;
}
case OP_Order: {
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
break;
}
default: {
assert(0);
}
......@@ -5409,6 +5413,114 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
return pOperator;
}
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
int32_t numOfCols = pSrc->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
char* tmp = realloc(pCol2->pData, newSize);
if (tmp != NULL) {
pCol2->pData = tmp;
int32_t offset = pCol2->info.bytes * pDest->info.rows;
memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
} else {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
}
pDest->info.rows += pSrc->info.rows;
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SOrderOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
break;
}
int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
}
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
void** pCols = calloc(numOfCols, POINTER_BYTES);
SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
pCols[i] = p1->pData;
pSchema[i].colId = p1->info.colId;
pSchema[i].bytes = p1->info.bytes;
pSchema[i].type = (uint8_t) p1->info.type;
}
__compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order);
taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
tfree(pCols);
tfree(pSchema);
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
{
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData col = {{0}};
col.info.colId = pExpr[i].base.colInfo.colId;
col.info.bytes = pExpr[i].base.colBytes;
col.info.type = pExpr[i].base.colType;
taosArrayPush(pDataBlock->pDataBlock, &col);
if (col.info.colId == pOrderVal->orderColId) {
pInfo->colIndex = i;
}
}
pDataBlock->info.numOfCols = numOfOutput;
pInfo->order = pOrderVal->order;
pInfo->pDataBlock = pDataBlock;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "InMemoryOrder";
pOperator->operatorType = OP_Order;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->exec = doSort;
pOperator->cleanup = destroyOrderOperatorInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
appendUpstream(pOperator, upstream);
return pOperator;
}
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order;
}
......@@ -6409,6 +6521,11 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
......@@ -6757,7 +6874,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doFill;
pOperator->cleanup = destroySFillOperatorInfo;
......
......@@ -1102,3 +1102,57 @@ void tOrderDescDestroy(tOrderDescriptor *pDesc) {
destroyColumnModel(pDesc->pColumnModel);
tfree(pDesc);
}
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) {
assert(numOfRows > 0 && numOfCols > 0 && index >= 0 && index < numOfCols);
int32_t bytes = pSchema[index].bytes;
int32_t size = bytes + sizeof(int32_t);
char* buf = calloc(1, size * numOfRows);
for(int32_t i = 0; i < numOfRows; ++i) {
char* dest = buf + size * i;
memcpy(dest, ((char*) pCols[index]) + bytes * i, bytes);
*(int32_t*)(dest+bytes) = i;
}
qsort(buf, numOfRows, size, compareFn);
int32_t prevLength = 0;
char* p = NULL;
for(int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes1 = pSchema[i].bytes;
if (i == index) {
for(int32_t j = 0; j < numOfRows; ++j){
char* src = buf + (j * size);
char* dest = ((char*)pCols[i]) + (j * bytes1);
memcpy(dest, src, bytes1);
}
} else {
// make sure memory buffer is enough
if (prevLength < bytes1) {
char *tmp = realloc(p, bytes1 * numOfRows);
assert(tmp);
p = tmp;
prevLength = bytes1;
}
memcpy(p, pCols[i], bytes1 * numOfRows);
for(int32_t j = 0; j < numOfRows; ++j){
char* dest = ((char*)pCols[i]) + bytes1 * j;
int32_t newPos = *(int32_t*)(buf + (j * size) + bytes);
char* src = p + (newPos * bytes1);
memcpy(dest, src, bytes1);
}
}
}
tfree(buf);
tfree(p);
}
\ No newline at end of file
......@@ -237,7 +237,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
}
pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
pBucket->comparFn = getKeyComparFunc(pBucket->type);
pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC);
pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) {
......
......@@ -557,10 +557,9 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
int32_t op = 0;
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
if (onlyQueryTags(pQueryAttr)) {
op = OP_TagScan;
taosArrayPush(plan, &op);
}
if (pQueryAttr->distinct) {
op = OP_Distinct;
taosArrayPush(plan, &op);
......@@ -651,8 +650,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
}
}
}
// outer query order by support
int32_t orderColId = pQueryAttr->order.orderColId;
if (pQueryAttr->vgId == 0 && orderColId != PRIMARYKEY_TIMESTAMP_COL_INDEX && orderColId != INT32_MIN) {
op = OP_Order;
taosArrayPush(plan, &op);
}
}
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
op = OP_Limit;
......
......@@ -25,7 +25,7 @@ extern "C" {
#define TSDB_PATTERN_MATCH 0
#define TSDB_PATTERN_NOMATCH 1
#define TSDB_PATTERN_NOWILDCARDMATCH 2
#define TSDB_PATTERN_STRING_MAX_LEN 100
#define TSDB_PATTERN_STRING_DEFAULT_LEN 100
#define FLT_COMPAR_TOL_FACTOR 4
#define FLT_EQUAL(_x, _y) (fabs((_x) - (_y)) <= (FLT_COMPAR_TOL_FACTOR * FLT_EPSILON))
......@@ -47,7 +47,7 @@ int WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, con
int32_t doCompare(const char* a, const char* b, int32_t type, size_t size);
__compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order);
__compar_fn_t getComparFunc(int32_t type, int32_t optr);
......
......@@ -16,58 +16,101 @@
#include "os.h"
#include "ttype.h"
#include "tcompare.h"
#include "tarray.h"
#include "hash.h"
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
int32_t setCompareBytes1(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
}
int32_t setCompareBytes2(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 2) ? 1 : 0;
}
int32_t setCompareBytes4(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 4) ? 1 : 0;
}
int32_t setCompareBytes8(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 8) ? 1 : 0;
}
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
int32_t compareInt8ValDesc(const void *pLeft, const void *pRight) {
return compareInt8Val(pRight, pLeft);
}
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
int32_t compareInt16ValDesc(const void* pLeft, const void* pRight) {
return compareInt16Val(pRight, pLeft);
}
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight);
int32_t compareInt32ValDesc(const void* pLeft, const void* pRight) {
return compareInt32Val(pRight, pLeft);
}
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt64ValDesc(const void* pLeft, const void* pRight) {
return compareInt64Val(pRight, pLeft);
}
int32_t compareUint32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
uint32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareUint32ValDesc(const void* pLeft, const void* pRight) {
return compareUint32Val(pRight, pLeft);
}
int32_t compareUint64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
uint64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareUint64ValDesc(const void* pLeft, const void* pRight) {
return compareUint64Val(pRight, pLeft);
}
int32_t compareUint16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
uint16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareUint16ValDesc(const void* pLeft, const void* pRight) {
return compareUint16Val(pRight, pLeft);
}
int32_t compareUint8Val(const void* pLeft, const void* pRight) {
uint8_t left = GET_UINT8_VAL(pLeft), right = GET_UINT8_VAL(pRight);
if (left > right) return 1;
......@@ -75,6 +118,10 @@ int32_t compareUint8Val(const void* pLeft, const void* pRight) {
return 0;
}
int32_t compareUint8ValDesc(const void* pLeft, const void* pRight) {
return compareUint8Val(pRight, pLeft);
}
int32_t compareFloatVal(const void *pLeft, const void *pRight) {
float p1 = GET_FLOAT_VAL(pLeft);
float p2 = GET_FLOAT_VAL(pRight);
......@@ -96,6 +143,10 @@ int32_t compareFloatVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
int32_t compareFloatValDesc(const void* pLeft, const void* pRight) {
return compareFloatVal(pRight, pLeft);
}
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
double p1 = GET_DOUBLE_VAL(pLeft);
double p2 = GET_DOUBLE_VAL(pRight);
......@@ -117,6 +168,10 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
int32_t compareDoubleValDesc(const void* pLeft, const void* pRight) {
return compareDoubleVal(pRight, pLeft);
}
int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
......@@ -133,6 +188,10 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
}
}
int32_t compareLenPrefixedStrDesc(const void* pLeft, const void* pRight) {
return compareLenPrefixedStr(pRight, pLeft);
}
int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
......@@ -149,6 +208,10 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
}
}
int32_t compareLenPrefixedWStrDesc(const void* pLeft, const void* pRight) {
return compareLenPrefixedWStr(pRight, pLeft);
}
/*
* Compare two strings
* TSDB_MATCH: Match
......@@ -262,7 +325,7 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
}
static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'};
assert(varDataLen(pRight) <= TSDB_MAX_FIELD_LEN);
......@@ -287,30 +350,50 @@ int32_t taosArrayCompareString(const void* a, const void* b) {
return compareLenPrefixedStr(x, y);
}
//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) {
// const SArray* arr = (const SArray*) pRight;
// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1;
//}
static int32_t compareFindItemInSet(const void *pLeft, const void* pRight) {
int32_t compareFindItemInSet(const void *pLeft, const void* pRight) {
return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0;
}
static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'};
assert(varDataLen(pRight) <= TSDB_MAX_FIELD_LEN * TSDB_NCHAR_SIZE);
wchar_t *pattern = calloc(varDataLen(pRight) + 1, sizeof(wchar_t));
wchar_t *pattern = calloc(varDataLen(pRight) + 1, sizeof(wchar_t));
memcpy(pattern, varDataVal(pRight), varDataLen(pRight));
int32_t ret = WCSPatternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft)/TSDB_NCHAR_SIZE, &pInfo);
free(pattern);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
__compar_fn_t getComparFunc(int32_t type, int32_t optr) {
__compar_fn_t comparFn = NULL;
if (optr == TSDB_RELATION_IN && (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR)) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
return setCompareBytes1;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
return setCompareBytes2;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_FLOAT:
return setCompareBytes4;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
return setCompareBytes8;
default:
assert(0);
}
}
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: comparFn = compareInt8Val; break;
......@@ -335,6 +418,8 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
case TSDB_DATA_TYPE_NCHAR: {
if (optr == TSDB_RELATION_LIKE) {
comparFn = compareWStrPatternComp;
} else if (optr == TSDB_RELATION_IN) {
comparFn = compareFindItemInSet;
} else {
comparFn = compareLenPrefixedWStr;
}
......@@ -354,50 +439,50 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
return comparFn;
}
__compar_fn_t getKeyComparFunc(int32_t keyType) {
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) {
__compar_fn_t comparFn = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL:
comparFn = compareInt8Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt8Val:compareInt8ValDesc;
break;
case TSDB_DATA_TYPE_SMALLINT:
comparFn = compareInt16Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt16Val:compareInt16ValDesc;
break;
case TSDB_DATA_TYPE_INT:
comparFn = compareInt32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
comparFn = compareInt64Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt64Val:compareInt64ValDesc;
break;
case TSDB_DATA_TYPE_FLOAT:
comparFn = compareFloatVal;
comparFn = (order == TSDB_ORDER_ASC)? compareFloatVal:compareFloatValDesc;
break;
case TSDB_DATA_TYPE_DOUBLE:
comparFn = compareDoubleVal;
comparFn = (order == TSDB_ORDER_ASC)? compareDoubleVal:compareDoubleValDesc;
break;
case TSDB_DATA_TYPE_UTINYINT:
comparFn = compareUint8Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint8Val:compareUint8ValDesc;
break;
case TSDB_DATA_TYPE_USMALLINT:
comparFn = compareUint16Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint16Val:compareUint16ValDesc;
break;
case TSDB_DATA_TYPE_UINT:
comparFn = compareUint32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint32Val:compareUint32ValDesc;
break;
case TSDB_DATA_TYPE_UBIGINT:
comparFn = compareUint64Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint64Val:compareUint64ValDesc;
break;
case TSDB_DATA_TYPE_BINARY:
comparFn = compareLenPrefixedStr;
comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedStr:compareLenPrefixedStrDesc;
break;
case TSDB_DATA_TYPE_NCHAR:
comparFn = compareLenPrefixedWStr;
comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedWStr:compareLenPrefixedWStrDesc;
break;
default:
comparFn = compareInt32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
}
......
......@@ -54,7 +54,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
pSkipList->keyFn = fn;
pSkipList->seed = rand();
if (comparFn == NULL) {
pSkipList->comparFn = getKeyComparFunc(keyType);
pSkipList->comparFn = getKeyComparFunc(keyType, TSDB_ORDER_ASC);
} else {
pSkipList->comparFn = comparFn;
}
......
......@@ -70,7 +70,7 @@ void doubleSkipListTest() {
}
void randKeyTest() {
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_INT, sizeof(int32_t), getKeyComparFunc(TSDB_DATA_TYPE_INT),
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_INT, sizeof(int32_t), getKeyComparFunc(TSDB_DATA_TYPE_INT, TSDB_ORDER_ASC),
false, getkey);
int32_t size = 200000;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册