提交 c8159c56 编写于 作者: H Haojun Liao

[td-805] opt query perf

上级 81cc5f4e
...@@ -422,7 +422,6 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField ...@@ -422,7 +422,6 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
int32_t bytes = pInfo->pSqlExpr->resBytes; int32_t bytes = pInfo->pSqlExpr->resBytes;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row; char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
int32_t realLen = varDataLen(pData); int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE); assert(realLen <= bytes - VARSTR_HEADER_SIZE);
......
...@@ -1385,6 +1385,11 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum ...@@ -1385,6 +1385,11 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
return numOfTotalColumns; return numOfTotalColumns;
} }
static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol);
}
int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExprItem* pItem) { int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExprItem* pItem) {
const char* msg0 = "invalid column name"; const char* msg0 = "invalid column name";
const char* msg1 = "tag for normal table query is not allowed"; const char* msg1 = "tag for normal table query is not allowed";
...@@ -1427,6 +1432,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -1427,6 +1432,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
addProjectQueryCol(pQueryInfo, startPos, &index, pItem); addProjectQueryCol(pQueryInfo, startPos, &index, pItem);
} }
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
} else { } else {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -1499,8 +1506,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1499,8 +1506,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
switch (optr) { switch (optr) {
case TK_COUNT: { case TK_COUNT: {
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) {
/* more than one parameter for count() function */ /* more than one parameter for count() function */
if (pItem->pNode->pParam != NULL && pItem->pNode->pParam->nExpr != 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
...@@ -1551,11 +1558,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1551,11 +1558,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} }
} else { // count(*) is equalled to count(primary_timestamp_key) } else { // count(*) is equalled to count(primary_timestamp_key)
index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false);
} }
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName)); memset(pExpr->aliasName, 0, tListLen(pExpr->aliasName));
getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1); getColumnName(pItem, pExpr->aliasName, sizeof(pExpr->aliasName) - 1);
...@@ -1570,9 +1578,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1570,9 +1578,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} }
// the time stamp may be always needed // the time stamp may be always needed
if (index.tableIndex > 0 && index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
tscColumnListInsert(pQueryInfo->colList, &tsCol);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1682,10 +1689,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1682,10 +1689,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i]));
} }
} }
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; tscInsertPrimaryTSSourceColumn(pQueryInfo, &index);
tscColumnListInsert(pQueryInfo->colList, &tsCol);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
case TK_FIRST: case TK_FIRST:
......
...@@ -4564,7 +4564,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4564,7 +4564,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// TODO handle the limit offset problem // TODO handle the limit offset problem
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
// skipBlocks(pRuntimeEnv);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
pQInfo->tableIndex++; pQInfo->tableIndex++;
continue; continue;
...@@ -4799,7 +4798,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -4799,7 +4798,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (!isTopBottomQuery(pQuery) && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore. if (!pRuntimeEnv->topBotQuery && pQuery->limit.offset > 0) { // no need to execute, since the output will be ignore.
return; return;
} }
...@@ -5639,6 +5638,20 @@ static int compareTableIdInfo(const void* a, const void* b) { ...@@ -5639,6 +5638,20 @@ static int compareTableIdInfo(const void* a, const void* b) {
static void freeQInfo(SQInfo *pQInfo); static void freeQInfo(SQInfo *pQInfo);
static void calResultBufSize(SQuery* pQuery) {
const int32_t RESULT_MSG_MIN_SIZE = 1024 * (1024 + 512); // bytes
const int32_t RESULT_MSG_MIN_ROWS = 8192;
const float RESULT_THRESHOLD_RATIO = 0.85;
int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->rowSize;
if (numOfRes < RESULT_MSG_MIN_ROWS) {
numOfRes = RESULT_MSG_MIN_ROWS;
}
pQuery->rec.capacity = numOfRes;
pQuery->rec.threshold = numOfRes * RESULT_THRESHOLD_RATIO;
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) { STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) {
int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfCols = pQueryMsg->numOfCols;
...@@ -5672,8 +5685,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5672,8 +5685,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->fillType = pQueryMsg->fillType; pQuery->fillType = pQueryMsg->fillType;
pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->numOfTags = pQueryMsg->numOfTags;
pQuery->tagColList = pTagCols; pQuery->tagColList = pTagCols;
// todo do not allocate ??
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) { if (pQuery->colList == NULL) {
goto _cleanup; goto _cleanup;
...@@ -5703,9 +5715,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5703,9 +5715,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
goto _cleanup; goto _cleanup;
} }
// set the output buffer capacity calResultBufSize(pQuery);
pQuery->rec.capacity = 4096;
pQuery->rec.threshold = 4000;
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
assert(pExprs[col].interBytes >= pExprs[col].bytes); assert(pExprs[col].interBytes >= pExprs[col].bytes);
...@@ -5967,7 +5977,6 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5967,7 +5977,6 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
tfree(pQuery->sdata); tfree(pQuery->sdata);
tfree(pQuery); tfree(pQuery);
qDebug("QInfo:%p QInfo is freed", pQInfo); qDebug("QInfo:%p QInfo is freed", pQInfo);
...@@ -6503,7 +6512,8 @@ void* qOpenQueryMgmt(int32_t vgId) { ...@@ -6503,7 +6512,8 @@ void* qOpenQueryMgmt(int32_t vgId) {
} }
static void queryMgmtKillQueryFn(void* handle) { static void queryMgmtKillQueryFn(void* handle) {
qKillQuery(handle); void** h = (void**) handle;
qKillQuery(*h);
} }
void qQueryMgmtNotifyClosed(void* pQMgmt) { void qQueryMgmtNotifyClosed(void* pQMgmt) {
......
...@@ -374,7 +374,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand ...@@ -374,7 +374,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
if (chandle == NULL) return -1; if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0); return taosWriteMsg(pFdObj->fd, data, len);
} }
static void taosReportBrokenLink(SFdObj *pFdObj) { static void taosReportBrokenLink(SFdObj *pFdObj) {
......
...@@ -604,7 +604,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -604,7 +604,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, QH_GET_NUM_OF_COLS(pQueryHandle)); int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, QH_GET_NUM_OF_COLS(pQueryHandle));
// int32_t ret = tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo);
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
...@@ -1534,7 +1533,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1534,7 +1533,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData colInfo = {{0}, 0};
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
...@@ -1542,7 +1541,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1542,7 +1541,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
colInfo.info = pCol->info; colInfo.info = pCol->info;
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
taosArrayPush(pSecQueryHandle->pColumns, &colInfo); taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
pSecQueryHandle->statis[i].colId = colInfo.info.colId;
} }
size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
...@@ -1564,7 +1562,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1564,7 +1562,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle); bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
assert(ret); assert(ret);
...@@ -1811,22 +1810,26 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -1811,22 +1810,26 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
int64_t stime = taosGetTimestampUs(); int64_t stime = taosGetTimestampUs();
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
// todo opt perf int16_t* colIds = pHandle->defaultLoadColumn->pData;
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis));
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SDataStatis* st = &pHandle->statis[i]; pHandle->statis[i].colId = colIds[i];
int32_t colId = st->colId;
memset(st, 0, sizeof(SDataStatis));
st->colId = colId;
} }
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols); tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
*pBlockStatis = pHandle->statis; // always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
pPrimaryColStatis->numOfNull = 0;
pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
//update the number of NULL data rows //update the number of NULL data rows
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 1; i < numOfCols; ++i) {
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
} }
...@@ -1842,6 +1845,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -1842,6 +1845,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
int64_t elapsed = taosGetTimestampUs() - stime; int64_t elapsed = taosGetTimestampUs() - stime;
pHandle->cost.statisInfoLoadTime += elapsed; pHandle->cost.statisInfoLoadTime += elapsed;
*pBlockStatis = pHandle->statis;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册