提交 e774e8b3 编写于 作者: H Haojun Liao

[TD-2070]<enhance>: improve project query performance.

上级 1a8cbfe6
......@@ -39,7 +39,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult);
TAOS_ROW doSetResultRowData(SSqlObj *pSql);
char *getArithemicInputSrc(void *param, const char *name, int32_t colId);
......
......@@ -313,6 +313,7 @@ typedef struct {
SResRec * pGroupRec;
char * data;
TAOS_ROW tsrow;
TAOS_ROW urow;
int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex * pColumnIndex;
......@@ -425,6 +426,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
......@@ -471,8 +473,9 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
int32_t bytes = pInfo->field.bytes;
char* pData = pRes->data + (int32_t)(offset * pRes->numOfRows + bytes * pRes->row);
UNUSED(pData);
// user defined constant value output columns
// user defined constant value output columns
if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
pData = pInfo->pSqlExpr->param[1].pz;
......
......@@ -341,7 +341,7 @@ TAOS_ROW tscFetchRow(void *param) {
return NULL;
}
void* data = doSetResultRowData(pSql, true);
void* data = doSetResultRowData(pSql);
tscClearSqlOwner(pSql);
return data;
......
......@@ -6412,7 +6412,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return code;
}
tVariantListItem* p1 = taosArrayGet(pQuerySql->from, i);
tVariantListItem* p1 = taosArrayGet(pQuerySql->from, i + 1);
if (p1->pVar.nType != TSDB_DATA_TYPE_BINARY) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
......
......@@ -1437,19 +1437,6 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
}
return 0;
}
/*
* this function can only be called once.
* by using pRes->rspType to denote its status
......@@ -1460,15 +1447,18 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->code = TSDB_CODE_SUCCESS;
if (pRes->rspType == 0) {
pRes->numOfRows = numOfRes;
pRes->row = 0;
pRes->rspType = 1;
tscSetResultPointer(pQueryInfo, pRes);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
tscSetResRawPtr(pRes, pQueryInfo);
} else {
tscResetForNextRetrieve(pRes);
}
......@@ -1512,10 +1502,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
}
pRes->code = tscDoLocalMerge(pSql);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo);
}
pRes->row = 0;
......@@ -2195,7 +2186,12 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) || pCmd->command == TSDB_SQL_RETRIEVE) {
tscSetResRawPtr(pRes, pQueryInfo);
}
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......
......@@ -489,6 +489,27 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
return (int)((pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows);
}
static bool needToFetchNewBlock(SSqlObj* pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
pCmd->command == TSDB_SQL_SERV_STATUS ||
pCmd->command == TSDB_SQL_CURRENT_DB ||
pCmd->command == TSDB_SQL_SERV_VERSION ||
pCmd->command == TSDB_SQL_CLI_VERSION ||
pCmd->command == TSDB_SQL_CURRENT_USER);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) {
......@@ -509,77 +530,48 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// set the sql object owner
tscSetSqlOwner(pSql);
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
pCmd->command == TSDB_SQL_SERV_STATUS ||
pCmd->command == TSDB_SQL_CURRENT_DB ||
pCmd->command == TSDB_SQL_SERV_VERSION ||
pCmd->command == TSDB_SQL_CLI_VERSION ||
pCmd->command == TSDB_SQL_CURRENT_USER )) {
// current data set are exhausted, fetch more result from node
if (pRes->row >= pRes->numOfRows && needToFetchNewBlock(pSql)) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
tsem_wait(&pSql->rspSem);
}
void* data = doSetResultRowData(pSql, true);
void* data = doSetResultRowData(pSql);
tscClearSqlOwner(pSql);
return data;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
#if 0
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
int nRows = 0;
if (pSql == NULL || pSql->signature != pSql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
*rows = NULL;
return 0;
}
// projection query on metric, pipeline retrieve data from vnode list,
// instead of two-stage mergednodeProcessMsgFromShell free qhandle
nRows = taos_fetch_block_impl(res, rows);
// current subclause is completed, try the next subclause
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->cmd.command = pQueryInfo->command;
pCmd->clauseIndex++;
pRes->numOfTotal += pRes->numOfClauseTotal;
pRes->numOfClauseTotal = 0;
pRes->rspType = 0;
pSql->subState.numOfSub = 0;
tfree(pSql->pSubs);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
assert(pSql->fp == NULL);
if (pRes->qhandle == 0 ||
pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) {
return 0;
}
tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
tscProcessSql(pSql);
// set the sql object owner
tscSetSqlOwner(pSql);
nRows = taos_fetch_block_impl(res, rows);
// current data set are exhausted, fetch more data from node
if (needToFetchNewBlock(pSql)) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
tsem_wait(&pSql->rspSem);
}
return nRows;
#endif
*rows = pRes->urow;
(*rows) = taos_fetch_row(res);
return ((*rows) != NULL)? 1:0;
tscClearSqlOwner(pSql);
return pRes->numOfRows;
}
int taos_select_db(TAOS *taos, const char *db) {
......@@ -600,7 +592,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
// send free message to vnode to free qhandle and corresponding resources in vnode
static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
static bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......
......@@ -2390,7 +2390,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
}
}
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
......@@ -2432,7 +2432,7 @@ char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -2445,22 +2445,20 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscNumOfFields(pQueryInfo);
int32_t offset = 0;
for (int i = 0; i < size; ++i) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, offset);
TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
offset += pField->bytes;
int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes;
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->interval.interval > 0) {
continue;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
} else {
pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
pRes->length[i] = varDataLen(pRes->urow[i]);
}
if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
transferNcharData(pSql, i, pField);
}
pRes->urow[i] += bytes;
}
pRes->row++; // index increase one-step
......
......@@ -265,16 +265,20 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pRes->tsrow == NULL) {
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
pRes->numOfCols = numOfOutput;
pRes->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
pRes->length = calloc(numOfOutput, sizeof(int32_t));
pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->urow = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
// not enough memory
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
if (pRes->tsrow == NULL || pRes->urow == NULL || pRes->length == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
tfree(pRes->tsrow);
tfree(pRes->urow);
tfree(pRes->length);
tfree(pRes->buffer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code;
}
......@@ -283,6 +287,71 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
assert(pRes->numOfCols > 0);
int32_t offset = 0;
for (int32_t i = 0; i < pRes->numOfCols; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
pRes->urow[i] = pRes->data + offset * pRes->numOfRows;
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
// generated the user-defined column result
if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (pInfo->pSqlExpr->param[1].nType == TSDB_DATA_TYPE_NULL) {
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pSqlExpr->param[1].nLen <= pInfo->field.bytes);
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = pRes->urow[i] + k * pInfo->field.bytes;
memcpy(varDataVal(p), pInfo->pSqlExpr->param[1].pz, pInfo->pSqlExpr->param[1].nLen);
varDataSetLen(p, pInfo->pSqlExpr->param[1].nLen);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = pRes->urow[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pSqlExpr->param[1].i64Key, pInfo->field.bytes);
}
}
}
} else if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i];
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
memcpy(dst, p, varDataTLen(p));
} else {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}
}
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfCols; i++) {
......@@ -297,6 +366,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->tsrow);
tfree(pRes->length);
tfree(pRes->buffer);
tfree(pRes->urow);
tfree(pRes->pGroupRec);
tfree(pRes->pColumnIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册