提交 84c8fc9e 编写于 作者: H Haojun Liao

[td-4865]<enhance>: support group by in the outer query.

上级 71367b78
此差异已折叠。
...@@ -2398,11 +2398,12 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2398,11 +2398,12 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
} }
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pCmd->command == TSDB_SQL_RETRIEVE) { if ((pCmd->command == TSDB_SQL_RETRIEVE) ||
tscSetResRawPtr(pRes, pQueryInfo); ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) &&
} else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) { !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) ||
tscSetResRawPtr(pRes, pQueryInfo); (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
} else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE))) {
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo);
} }
......
...@@ -578,70 +578,72 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -578,70 +578,72 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar) {
assert(pRes->numOfCols > 0); // generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) {
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pExpr->base.param[1].nLen <= pInfo->field.bytes);
int32_t offset = 0; for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { memcpy(varDataVal(p), pInfo->pExpr->base.param[1].pz, pInfo->pExpr->base.param[1].nLen);
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i); varDataSetLen(p, pInfo->pExpr->base.param[1].nLen);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pExpr->base.param[1].i64, pInfo->field.bytes);
}
}
}
pRes->urow[i] = pRes->data + offset * pRes->numOfRows; } else if (convertNchar && pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
pRes->length[i] = pInfo->field.bytes; // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
offset += pInfo->field.bytes; // string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
// generated the user-defined column result char* p = pRes->urow[i];
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) { for (int32_t k = 0; k < pRes->numOfRows; ++k) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) { char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pExpr->base.param[1].nLen <= pInfo->field.bytes);
for (int32_t k = 0; k < pRes->numOfRows; ++k) { if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes; memcpy(dst, p, varDataTLen(p));
} else if (varDataLen(p) > 0) {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length);
memcpy(varDataVal(p), pInfo->pExpr->base.param[1].pz, pInfo->pExpr->base.param[1].nLen); if (length == 0) {
varDataSetLen(p, pInfo->pExpr->base.param[1].nLen); tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pExpr->base.param[1].i64, pInfo->field.bytes);
}
} }
} else {
varDataSetLen(dst, 0);
} }
} else if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR) { p += pInfo->field.bytes;
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol }
pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i]; memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
for (int32_t k = 0; k < pRes->numOfRows; ++k) { }
char* dst = pRes->buffer[i] + k * pInfo->field.bytes; }
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) { void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
memcpy(dst, p, varDataTLen(p)); assert(pRes->numOfCols > 0);
} else if (varDataLen(p) > 0) {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) { int32_t offset = 0;
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
} SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
} else {
varDataSetLen(dst, 0);
}
p += pInfo->field.bytes; pRes->urow[i] = pRes->data + offset * pRes->numOfRows;
} pRes->length[i] = pInfo->field.bytes;
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows); offset += pInfo->field.bytes;
} setResRawPtrImpl(pRes, pInfo, i, true);
} }
} }
...@@ -656,6 +658,8 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc ...@@ -656,6 +658,8 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
pRes->urow[i] = pColData->pData; pRes->urow[i] = pColData->pData;
pRes->length[i] = pInfo->field.bytes; pRes->length[i] = pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, convertNchar);
/*
// generated the user-defined column result // generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) { if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) { if (pInfo->pExpr->base.param[1].nType == TSDB_DATA_TYPE_NULL) {
...@@ -706,7 +710,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc ...@@ -706,7 +710,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
} }
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows); memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
} }*/
} }
} }
...@@ -989,6 +993,8 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { ...@@ -989,6 +993,8 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
pInfo->block = destroyOutputBuf(pInfo->block); pInfo->block = destroyOutputBuf(pInfo->block);
pInfo->pSql = NULL; pInfo->pSql = NULL;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
cleanupResultRowInfo(&pInfo->pTableQueryInfo->resInfo); cleanupResultRowInfo(&pInfo->pTableQueryInfo->resInfo);
tfree(pInfo->pTableQueryInfo); tfree(pInfo->pTableQueryInfo);
} }
......
...@@ -571,6 +571,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); ...@@ -571,6 +571,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void* destroyOutputBuf(SSDataBlock* pBlock); void* destroyOutputBuf(SSDataBlock* pBlock);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
......
...@@ -179,7 +179,6 @@ static STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* w ...@@ -179,7 +179,6 @@ static STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* w
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr); static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr);
...@@ -237,37 +236,41 @@ static int compareRowData(const void *a, const void *b, const void *userData) { ...@@ -237,37 +236,41 @@ static int compareRowData(const void *a, const void *b, const void *userData) {
static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) { static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) {
SArray *columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); SArray *columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
if (taosArrayGetSize(columnOrderList) <= 0) { size_t size = taosArrayGetSize(columnOrderList);
taosArrayDestroy(columnOrderList);
if (size <= 0) {
return; return;
} }
int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId; int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId;
if (orderId <= 0) { if (orderId <= 0) {
return; return;
} }
bool found = false; bool found = false;
int16_t dataOffset = 0; int16_t dataOffset = 0;
//SColIndex *index = taosArrayGet(columnOrderList, 0);
for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock->pDataBlock, j); SColumnInfoData* pColInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock->pDataBlock, j);
if (orderId == j) { if (orderId == j) {
found = true; found = true;
break; break;
} }
dataOffset += pColInfoData->info.bytes; dataOffset += pColInfoData->info.bytes;
} }
if (found == false) { if (found == false) {
return; return;
} }
int16_t type = pRuntimeEnv->pQueryAttr->pExpr1[orderId].base.resType; int16_t type = pRuntimeEnv->pQueryAttr->pExpr1[orderId].base.resType;
SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)}; SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)};
taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support); taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support);
return;
} }
//setup the output buffer for each operator //setup the output buffer for each operator
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
const static int32_t minSize = 8; const static int32_t minSize = 8;
...@@ -1798,7 +1801,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1798,7 +1801,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQueryAttr->tableCols[i-1].bytes; pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQueryAttr->tableCols[i-1].bytes;
} }
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; if (pQueryAttr->tableCols[0].type == TSDB_DATA_TYPE_TIMESTAMP) {
*(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN;
}
} }
qDebug("QInfo:0x%"PRIx64" init runtime environment completed", GET_QID(pRuntimeEnv)); qDebug("QInfo:0x%"PRIx64" init runtime environment completed", GET_QID(pRuntimeEnv));
...@@ -1832,7 +1837,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1832,7 +1837,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Groupby: { case OP_Groupby: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break; break;
} }
case OP_SessionWindow: { case OP_SessionWindow: {
...@@ -2683,10 +2692,6 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData ...@@ -2683,10 +2692,6 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
} }
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) { void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) {
if (numOfFilterCols > 0 && pFilterInfo[0].pData != NULL) {
return;
}
// set the initial static data value filter expression // set the initial static data value filter expression
for (int32_t i = 0; i < numOfFilterCols; ++i) { for (int32_t i = 0; i < numOfFilterCols; ++i) {
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
......
...@@ -364,7 +364,7 @@ static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) { ...@@ -364,7 +364,7 @@ static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) {
STsdbCfg* pCfg = &pTsdb->config; STsdbCfg* pCfg = &pTsdb->config;
int64_t now = taosGetTimestamp(pCfg->precision); int64_t now = taosGetTimestamp(pCfg->precision);
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep); return now - (tsTickPerDay[pCfg->precision] * pCfg->keep) + 1; // needs to add one tick
} }
static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) { static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) {
......
...@@ -426,4 +426,9 @@ endi ...@@ -426,4 +426,9 @@ endi
sql_error select last_row(*) from (select * from nest_tb0) having c1 > 0 sql_error select last_row(*) from (select * from nest_tb0) having c1 > 0
print ===========>td-4805
sql_error select tbname, i from (select * from nest_tb0) group by i;
sql select count(*),c1 from (select * from nest_tb0) where c1 < 2 group by c1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册