提交 ae453f80 编写于 作者: H hjxilinx

[tbase-849]

上级 39fe4bc5
......@@ -86,15 +86,16 @@ enum _sql_cmd {
TSDB_SQL_MAX //48
};
#define MAX_TOKEN_LEN 30
// token type
enum {
TSQL_NODE_TYPE_EXPR = 0x1,
TSQL_NODE_TYPE_ID = 0x2,
TSQL_NODE_TYPE_VALUE = 0x4,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
extern char tTokenTypeSwitcher[13];
#define toTSDBType(x) \
......
......@@ -128,6 +128,8 @@ void tscFieldInfoSetValFromSchema(SFieldInfo* pFieldInfo, int32_t index, SSchema
void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* pField);
void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes);
void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible);
void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr);
void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionExpr* pExpr);
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
......@@ -149,6 +151,7 @@ SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t f
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid);
......
......@@ -31,9 +31,8 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
// forward declaration
struct SSqlInfo;
......@@ -70,13 +69,19 @@ typedef struct SSqlExpr {
int16_t interResBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
} SSqlExpr;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SFieldInfo {
int16_t numOfOutputCols; // number of column in result
int16_t numOfAlloc; // allocated size
TAOS_FIELD *pFields;
short * pOffset;
// short * pOffset;
/*
* define if this column is belong to the queried result, it may be add by parser to faciliate
......@@ -85,7 +90,9 @@ typedef struct SFieldInfo {
* NOTE: these hidden columns always locate at the end of the output columns
*/
bool * pVisibleCols;
int32_t numOfHiddenCols; // the number of column not belongs to the queried result columns
int32_t numOfHiddenCols; // the number of column not belongs to the queried result columns
SSqlFunctionExpr** pExpr; // used for aggregation arithmetic express,such as count(*)+count(*)
SSqlExpr** pSqlExpr;
} SFieldInfo;
typedef struct SSqlExprInfo {
......@@ -94,11 +101,6 @@ typedef struct SSqlExprInfo {
SSqlExpr *pExprs;
} SSqlExprInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SColumnBase {
SColumnIndex colIndex;
int32_t numOfFilters;
......@@ -163,7 +165,7 @@ typedef struct STableDataBlocks {
int32_t rowSize; // row size for current table
uint32_t nAllocSize;
uint32_t headerSize; // header for metadata (submit metadata)
uint32_t headerSize; // header for metadata (submit metadata)
uint32_t size;
/*
......@@ -199,8 +201,8 @@ typedef struct SQueryInfo {
int64_t etime, stime;
int64_t intervalTime; // aggregation time interval
int64_t nSlidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
SColumnBaseInfo colList;
SFieldInfo fieldsInfo;
......@@ -216,9 +218,9 @@ typedef struct SQueryInfo {
int64_t * defaultVal; // default value for interpolation
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset;
int64_t prjOffset;
} SQueryInfo;
// data source from sql string or from file
......@@ -269,29 +271,29 @@ typedef struct SResRec {
struct STSBuf;
typedef struct {
uint8_t code;
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp;
int rspType;
int rspLen;
uint64_t qhandle;
int64_t uid;
int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable
int row;
int16_t numOfnchar;
int16_t precision;
int32_t numOfGroups;
SResRec * pGroupRec;
char * data;
short * bytes;
void ** tsrow;
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
uint8_t code;
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp;
int rspType;
int rspLen;
uint64_t qhandle;
int64_t uid;
int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable
int row;
int16_t numOfnchar;
int16_t precision;
int32_t numOfGroups;
SResRec * pGroupRec;
char * data;
short * bytes;
void ** tsrow;
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex *pColumnIndex;
struct SLocalReducer *pLocalReducer;
SColumnIndex * pColumnIndex;
} SSqlRes;
typedef struct _tsc_obj {
......@@ -410,13 +412,13 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd);
void tscFreeResData(SSqlObj* pSql);
void tscFreeResData(SSqlObj *pSql);
/**
* free query result of the sql object
* @param pObj
*/
void tscFreeSqlResult(SSqlObj* pSql);
void tscFreeSqlResult(SSqlObj *pSql);
/**
* only free part of resources allocated during query.
......
......@@ -158,7 +158,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
return pNode;
}
static uint8_t getBinaryExprOptr(SSQLToken *pToken) {
uint8_t getBinaryExprOptr(SSQLToken *pToken) {
switch (pToken->type) {
case TK_LT:
return TSDB_RELATION_LESS;
......@@ -183,6 +183,7 @@ static uint8_t getBinaryExprOptr(SSQLToken *pToken) {
case TK_STAR:
return TSDB_BINARY_OP_MULTIPLY;
case TK_SLASH:
case TK_DIVIDE:
return TSDB_BINARY_OP_DIVIDE;
case TK_REM:
return TSDB_BINARY_OP_REMAINDER;
......
......@@ -284,7 +284,7 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
}
for (int i = 0; i < pCmd->numOfCols; ++i)
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row;
pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
......@@ -298,7 +298,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < pCmd->numOfCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row;
}
pRes->row++;
......
......@@ -3297,7 +3297,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size;
pCtx->param[1].pz = NULL;
}
......
此差异已折叠。
......@@ -19,6 +19,7 @@
#include "tscUtil.h"
#include "tsclient.h"
#include "tutil.h"
#include "tschemautil.h"
typedef struct SCompareParam {
SLocalDataSource **pLocalData;
......@@ -59,12 +60,13 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
*/
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx->aOutputBuf = pReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pReducer->resColModel->capacity;
pCtx->order = pQueryInfo->order.order;
pCtx->functionId = pQueryInfo->exprsInfo.pExprs[i].functionId;
pCtx->functionId = pExpr->functionId;
// input buffer hold only one point data
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i);
......@@ -76,19 +78,16 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
pCtx->inputType = pSchema->type;
pCtx->inputBytes = pSchema->bytes;
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
// output data format yet comes from pCmd.
pCtx->outputBytes = pField->bytes;
pCtx->outputType = pField->type;
pCtx->outputBytes = pExpr->resBytes;
pCtx->outputType = pExpr->resType;
pCtx->startOffset = 0;
pCtx->size = 1;
pCtx->hasNull = true;
pCtx->currentStage = SECONDARY_STAGE_MERGE;
pRes->bytes[i] = pField->bytes;
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
pRes->bytes[i] = pExpr->resBytes;
// for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->functionId;
......@@ -255,7 +254,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// the input data format follows the old format, but output in a new format.
// so, all the input must be parsed as old format
pReducer->pCtx = (SQLFunctionCtx *)calloc(pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(SQLFunctionCtx));
pReducer->pCtx = (SQLFunctionCtx *)calloc(pQueryInfo->exprsInfo.numOfExprs, sizeof(SQLFunctionCtx));
pReducer->rowSize = pMemBuffer[0]->nElemSize;
......@@ -302,7 +301,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
}
pReducer->pTempBuffer->numOfElems = 0;
pReducer->pResInfo = calloc((size_t)pQueryInfo->fieldsInfo.numOfOutputCols, sizeof(SResultInfo));
pReducer->pResInfo = calloc((size_t)pQueryInfo->exprsInfo.numOfExprs, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pRes, pReducer, pDesc);
......@@ -613,7 +612,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
return pRes->code;
}
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * pQueryInfo->fieldsInfo.numOfOutputCols);
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs);
if (pSchema == NULL) {
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -621,7 +620,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
}
int32_t rlen = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
pSchema[i].bytes = pExpr->resBytes;
......@@ -635,7 +634,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
capacity = nBufferSizes / rlen;
}
pModel = createColumnModel(pSchema, pQueryInfo->fieldsInfo.numOfOutputCols, capacity);
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
......@@ -647,16 +646,33 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
return pRes->code;
}
memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->fieldsInfo.numOfOutputCols);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
// final result depends on the fields number
memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs);
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema* p1 = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx);
int16_t inter = 0;
int16_t type = -1;
int16_t bytes = 0;
if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
(pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX)) {
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
getResultDataInfo(p1->type, p1->bytes, pExpr->functionId, 0, &type, &bytes, &inter, 0, false);
} else {
type = pModel->pFields[i].field.type;
bytes = pModel->pFields[i].field.bytes;
}
pSchema[i].type = pField->type;
pSchema[i].bytes = pField->bytes;
strcpy(pSchema[i].name, pField->name);
pSchema[i].type = type;
pSchema[i].bytes = bytes;
strcpy(pSchema[i].name, pModel->pFields[i].field.name);
}
*pFinalModel = createColumnModel(pSchema, pQueryInfo->fieldsInfo.numOfOutputCols, capacity);
*pFinalModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
tfree(pSchema);
return TSDB_CODE_SUCCESS;
......@@ -862,12 +878,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
}
int32_t rowSize = tscGetResRowLength(pQueryInfo);
// handle the descend order output
// if (pQueryInfo->order.order == TSQL_SO_ASC) {
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
// } else {
// reversedCopyResultToDstBuf(pQueryInfo, pRes, pFinalDataPage);
// }
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
pFinalDataPage->numOfElems = 0;
return;
......@@ -999,15 +1010,15 @@ static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer,
// the tag columns need to be set before all functions execution
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for(int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) {
for(int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
tVariantAssign(&pCtx->param[0], &pExpr->param[0]);
// tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TAG ||
pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
tVariantDestroy(&pCtx->tag);
tVariantCreateFromBinary(&pCtx->tag, pCtx->aInputElemBuf, pCtx->inputBytes, pCtx->inputType);
}
......@@ -1019,7 +1030,7 @@ static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer,
}
}
for (int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
......@@ -1041,20 +1052,29 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId;
// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j];
// if (pExpr == NULL) {
// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL);
//
// maxOutput = 1;
// continue;
// }
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, j);
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
continue;
}
if (maxOutput < GET_RES_INFO(&pCtx[j])->numOfRes) {
maxOutput = GET_RES_INFO(&pCtx[j])->numOfRes;
}
}
return maxOutput;
}
......@@ -1066,7 +1086,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
*/
static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) {
int32_t maxBufSize = 0; // find the max tags column length to prepare the buffer
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
if (maxBufSize < pExpr->resBytes && pExpr->functionId == TSDB_FUNC_TAG) {
maxBufSize = pExpr->resBytes;
......@@ -1076,7 +1096,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo
assert(maxBufSize >= 0);
char *buf = malloc((size_t) maxBufSize);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
if (pExpr->functionId != TSDB_FUNC_TAG) {
continue;
......@@ -1098,12 +1118,9 @@ static void fillMultiRowsOfTagsVal(SQueryInfo* pQueryInfo, int32_t numOfRes, SLo
}
int32_t finalizeRes(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
aAggs[pExpr->functionId].xFinalize(&pLocalReducer->pCtx[k]);
// allow to re-initialize for the next round
pLocalReducer->pCtx[k].resultInfo->initialized = false;
}
pLocalReducer->hasPrevRow = false;
......
......@@ -901,7 +901,7 @@ int tscLaunchSTableSubqueries(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel * pModel = NULL;
SColumnModel * pModel = NULL;
pRes->qhandle = 1; // hack the qhandle check
......@@ -1685,7 +1685,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->nSlidingTime);
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
if (pQueryInfo->intervalTime < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %ld", pSql, pQueryInfo->intervalTime);
......@@ -1768,7 +1768,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlFuncExprMsg *pSqlFuncExpr = (SSqlFuncExprMsg *)pMsg;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_ARITHM) {
......@@ -1787,7 +1787,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pMsg += sizeof(SSqlFuncExprMsg);
pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) {
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
......
......@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include <tast.h>
#include "hash.h"
#include "os.h"
#include "tcache.h"
#include "tlog.h"
#include "tnote.h"
......@@ -200,7 +201,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
taosCleanUpHashTable(pSql->pTableHashList);
pSql->pTableHashList = NULL;
}
tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr);
pRes->code = (uint8_t)tsParseSql(pSql, false);
......@@ -367,9 +368,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) +
// pRes->bytes[i] * (1 - pQueryInfo->order.order) * (pRes->numOfRows - 1);
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order);
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i);
}
*rows = pRes->tsrow;
......@@ -377,22 +376,73 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
return (pQueryInfo->order.order == TSQL_SO_DESC) ? pRes->numOfRows : -pRes->numOfRows;
}
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (isNull(pRes->tsrow[columnIndex], pField->type)) {
pRes->tsrow[columnIndex] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/
memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) {
pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
pRes->tsrow[columnIndex] = NULL;
}
}
}
static char *getArithemicInputSrc(void *param, char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SSqlFunctionExpr * pExpr = pSupport->pExpr;
int32_t index = -1;
for (int32_t i = 0; i < pExpr->pBinExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->pBinExprInfo.pReqColumns[i].name) == 0) {
index = i;
break;
}
}
assert(index >= 0 && index < pExpr->pBinExprInfo.numOfCols);
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
}
static void **doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow);
return pRes->tsrow;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//todo refactor move away
for(int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
if (k > 0) {
SSqlExpr* pPrev = tscSqlExprGet(pQueryInfo, k - 1);
pExpr->offset = pPrev->offset + pPrev->resBytes;
}
}
int32_t num = 0;
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pRes->bytes[i] * pRes->row;
}
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->intervalTime > 0) {
......@@ -400,31 +450,38 @@ static void **doSetResultRowData(SSqlObj *pSql) {
}
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
if (isNull(pRes->tsrow[i], pField->type)) {
pRes->tsrow[i] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[num] == NULL) {
pRes->buffer[num] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
transferNcharData(pSql, i, pField);
/* string terminated char for binary data*/
memset(pRes->buffer[num], 0, pField->bytes + TSDB_NCHAR_SIZE);
if (taosUcs4ToMbs(pRes->tsrow[i], pField->bytes, pRes->buffer[num])) {
pRes->tsrow[i] = pRes->buffer[num];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
pRes->tsrow[i] = NULL;
// calculate the result from serveral other columns
if (pQueryInfo->fieldsInfo.pExpr != NULL && pQueryInfo->fieldsInfo.pExpr[i] != NULL) {
SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
sas->offset = 0;
sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i];
sas->numOfCols = sas->pExpr->pBinExprInfo.numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
}
num++;
for(int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pExpr->pBinExprInfo.pReqColumns[k].colIdxInBuf;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization
}
}
assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols);
pRes->row++; // index increase one-step
pRes->row++; // index increase one-step
return pRes->tsrow;
}
......@@ -466,7 +523,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
if (pSql->pSubs[i] == 0) {
continue;
}
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
......@@ -502,11 +559,10 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) &&
(doSetResultRowData(pSql->pSubs[1]) != NULL);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
......@@ -596,7 +652,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
tscProcessSql(pSql); // retrieve data from virtual node
//if failed to retrieve data from current virtual node, try next one if exists
// if failed to retrieve data from current virtual node, try next one if exists
if (hasMoreVnodesToTry(pSql)) {
tscTryQueryNextVnode(pSql, NULL);
}
......@@ -638,7 +694,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current subclause is completed, try the next subclause
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, NULL);
// if the rows is not NULL, return immediately
rows = taos_fetch_row_impl(res);
}
......@@ -701,7 +757,7 @@ int taos_select_db(TAOS *taos, const char *db) {
return taos_query(taos, sql);
}
void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (res == NULL) return;
SSqlObj *pSql = (SSqlObj *)res;
......@@ -755,7 +811,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command);
void *fp = pSql->fp;
if (fp != NULL) {
pSql->freed = 1;
......@@ -804,9 +860,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
}
}
void taos_free_result(TAOS_RES *res) {
taos_free_result_imp(res, 0);
}
void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
int taos_errno(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
......@@ -822,26 +876,24 @@ int taos_errno(TAOS *taos) {
return code;
}
static bool validErrorCode(int32_t code) {
return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE;
}
static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; }
/*
* In case of invalid sql error, additional information is attached to explain
* why the sql is invalid
*/
static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd* pCmd) {
static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
if (code != TSDB_CODE_INVALID_SQL) {
return false;
}
size_t len = strlen(pCmd->payload);
char* z = NULL;
char *z = NULL;
if (len > 0) {
z = strstr (pCmd->payload, "invalid SQL");
z = strstr(pCmd->payload, "invalid SQL");
}
return z != NULL;
}
......@@ -851,12 +903,12 @@ char *taos_errstr(TAOS *taos) {
if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode];
SSqlObj* pSql = pObj->pSql;
SSqlObj *pSql = pObj->pSql;
if (validErrorCode(pSql->res.code)) {
code = pSql->res.code;
} else {
code = TSDB_CODE_OTHERS; //unknown error
code = TSDB_CODE_OTHERS; // unknown error
}
if (hasAdditionalErrorInfo(code, &pSql->cmd)) {
......@@ -954,14 +1006,14 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
size_t xlen = 0;
for (xlen = 0; xlen <= fields[i].bytes; xlen++) {
char c = ((char*)row[i])[xlen];
if (c == 0) break;
str[len++] = c;
}
str[len] = 0;
} break;
size_t xlen = 0;
for (xlen = 0; xlen <= fields[i].bytes; xlen++) {
char c = ((char *)row[i])[xlen];
if (c == 0) break;
str[len++] = c;
}
str[len] = 0;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
......
......@@ -389,33 +389,33 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pStream->interval = pQueryInfo->intervalTime; // it shall be derived from sql string
if (pQueryInfo->nSlidingTime == 0) {
pQueryInfo->nSlidingTime = pQueryInfo->intervalTime;
if (pQueryInfo->slidingTime == 0) {
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
}
int64_t minSlidingTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
if (pQueryInfo->nSlidingTime == -1) {
pQueryInfo->nSlidingTime = pQueryInfo->intervalTime;
} else if (pQueryInfo->nSlidingTime < minSlidingTime) {
if (pQueryInfo->slidingTime == -1) {
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
} else if (pQueryInfo->slidingTime < minSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nSlidingTime, minSlidingTime);
pQueryInfo->slidingTime, minSlidingTime);
pQueryInfo->nSlidingTime = minSlidingTime;
pQueryInfo->slidingTime = minSlidingTime;
}
if (pQueryInfo->nSlidingTime > pQueryInfo->intervalTime) {
if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) {
tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nSlidingTime, pQueryInfo->intervalTime);
pQueryInfo->slidingTime, pQueryInfo->intervalTime);
pQueryInfo->nSlidingTime = pQueryInfo->intervalTime;
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
}
pStream->slidingTime = pQueryInfo->nSlidingTime;
pStream->slidingTime = pQueryInfo->slidingTime;
pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor
pQueryInfo->nSlidingTime = 0;
pQueryInfo->slidingTime = 0;
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
......
......@@ -27,6 +27,7 @@
#include "tsclient.h"
#include "tsqldef.h"
#include "ttimer.h"
#include "tast.h"
/*
* the detailed information regarding metric meta key is:
......@@ -343,22 +344,22 @@ void tscClearSqlMetaInfoForce(SSqlCmd* pCmd) {
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pRes->tsrow == NULL) {
pRes->numOfnchar = 0;
//
int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols;
for (int32_t i = 0; i < numOfOutputCols; ++i) {
TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i);
if (pField->type == TSDB_DATA_TYPE_NCHAR) {
pRes->numOfnchar++;
}
}
pRes->numOfnchar = numOfOutputCols;
// for (int32_t i = 0; i < numOfOutputCols; ++i) {
// TAOS_FIELD* pField = tscFieldInfoGetField(pQueryInfo, i);
// if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// pRes->numOfnchar++;
// }
// }
pRes->tsrow = calloc(1, (POINTER_BYTES + sizeof(short)) * numOfOutputCols + POINTER_BYTES * pRes->numOfnchar);
pRes->bytes = calloc(numOfOutputCols, sizeof(short));
if (pRes->numOfnchar > 0) {
pRes->buffer = calloc(POINTER_BYTES, pRes->numOfnchar);
}
// if (pRes->numOfnchar > 0) {
pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols);
// }
// not enough memory
if (pRes->tsrow == NULL || pRes->bytes == NULL || (pRes->buffer == NULL && pRes->numOfnchar > 0)) {
......@@ -376,7 +377,7 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void tscDestroyResPointerInfo(SSqlRes* pRes) {
if (pRes->buffer != NULL) {
assert(pRes->numOfnchar > 0);
// assert(pRes->numOfnchar > 0);
// free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfnchar; i++) {
tfree(pRes->buffer[i]);
......@@ -831,11 +832,18 @@ static void ensureSpace(SFieldInfo* pFieldInfo, int32_t size) {
pFieldInfo->pFields = realloc(pFieldInfo->pFields, newSize * sizeof(TAOS_FIELD));
memset(&pFieldInfo->pFields[oldSize], 0, inc * sizeof(TAOS_FIELD));
pFieldInfo->pOffset = realloc(pFieldInfo->pOffset, newSize * sizeof(int16_t));
memset(&pFieldInfo->pOffset[oldSize], 0, inc * sizeof(int16_t));
// pFieldInfo->pOffset = realloc(pFieldInfo->pOffset, newSize * sizeof(int16_t));
// memset(&pFieldInfo->pOffset[oldSize], 0, inc * sizeof(int16_t));
pFieldInfo->pVisibleCols = realloc(pFieldInfo->pVisibleCols, newSize * sizeof(bool));
memset(&pFieldInfo->pVisibleCols[oldSize], 0, inc * sizeof(bool));
pFieldInfo->pSqlExpr = realloc(pFieldInfo->pSqlExpr, POINTER_BYTES*newSize);
pFieldInfo->pExpr = realloc(pFieldInfo->pExpr, POINTER_BYTES*newSize);
memset(&pFieldInfo->pSqlExpr[oldSize], 0, inc * POINTER_BYTES);
memset(&pFieldInfo->pExpr[oldSize], 0, inc * POINTER_BYTES);
pFieldInfo->numOfAlloc = newSize;
}
}
......@@ -844,6 +852,15 @@ static void evic(SFieldInfo* pFieldInfo, int32_t index) {
if (index < pFieldInfo->numOfOutputCols) {
memmove(&pFieldInfo->pFields[index + 1], &pFieldInfo->pFields[index],
sizeof(pFieldInfo->pFields[0]) * (pFieldInfo->numOfOutputCols - index));
memmove(&pFieldInfo->pVisibleCols[index + 1], &pFieldInfo->pVisibleCols[index],
sizeof(pFieldInfo->pVisibleCols[0]) * (pFieldInfo->numOfOutputCols - index));
memmove(&pFieldInfo->pSqlExpr[index + 1], &pFieldInfo->pSqlExpr[index],
sizeof(pFieldInfo->pSqlExpr[0]) * (pFieldInfo->numOfOutputCols - index));
memmove(&pFieldInfo->pExpr[index + 1], &pFieldInfo->pExpr[index],
sizeof(pFieldInfo->pExpr[0]) * (pFieldInfo->numOfOutputCols - index));
}
}
......@@ -868,7 +885,6 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE
memcpy(&pFieldInfo->pFields[index], pField, sizeof(TAOS_FIELD));
pFieldInfo->pVisibleCols[index] = true;
pFieldInfo->numOfOutputCols++;
}
......@@ -902,29 +918,49 @@ void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, co
pFieldInfo->numOfOutputCols++;
}
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) {
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
pFieldInfo->pOffset[0] = 0;
void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr) {
assert(index >= 0 && index < pFieldInfo->numOfOutputCols);
pFieldInfo->pSqlExpr[index] = pExpr;
}
void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionExpr* pExpr) {
assert(index >= 0 && index < pFieldInfo->numOfOutputCols);
pFieldInfo->pExpr[index] = pExpr;
}
for (int32_t i = 1; i < pFieldInfo->numOfOutputCols; ++i) {
pFieldInfo->pOffset[i] = pFieldInfo->pOffset[i - 1] + pFieldInfo->pFields[i - 1].bytes;
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) {
SSqlExprInfo* pExprInfo = &pQueryInfo->exprsInfo;
pExprInfo->pExprs[0].offset = 0;
for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) {
pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes;
}
}
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
if (pFieldInfo->numOfOutputCols == 0) {
// SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
// if (pFieldInfo->numOfOutputCols == 0) {
// return;
// }
//
// pFieldInfo->pOffset[0] = 0;
//
// /*
// * the retTypeLen is used to store the intermediate result length
// * for potential secondary merge exists
// */
// for (int32_t i = 1; i < pFieldInfo->numOfOutputCols; ++i) {
// pFieldInfo->pOffset[i] = pFieldInfo->pOffset[i - 1] + tscSqlExprGet(pQueryInfo, i - 1)->resBytes;
// }
SSqlExprInfo* pExprInfo = &pQueryInfo->exprsInfo;
if (pExprInfo->numOfExprs == 0) {
return;
}
pFieldInfo->pOffset[0] = 0;
/*
* the retTypeLen is used to store the intermediate result length
* for potential secondary merge exists
*/
for (int32_t i = 1; i < pFieldInfo->numOfOutputCols; ++i) {
pFieldInfo->pOffset[i] = pFieldInfo->pOffset[i - 1] + tscSqlExprGet(pQueryInfo, i - 1)->resBytes;
pExprInfo->pExprs[0].offset = 0;
for (int32_t i = 1; i < pExprInfo->numOfExprs; ++i) {
pExprInfo->pExprs[i].offset = pExprInfo->pExprs[i - 1].offset + pExprInfo->pExprs[i - 1].resBytes;
}
}
......@@ -948,12 +984,14 @@ void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src) {
*dst = *src;
dst->pFields = malloc(sizeof(TAOS_FIELD) * dst->numOfAlloc);
dst->pOffset = malloc(sizeof(short) * dst->numOfAlloc);
// dst->pOffset = malloc(sizeof(short) * dst->numOfAlloc);
dst->pVisibleCols = malloc(sizeof(bool) * dst->numOfAlloc);
dst->pSqlExpr = malloc(POINTER_BYTES * dst->numOfAlloc);
memcpy(dst->pFields, src->pFields, sizeof(TAOS_FIELD) * dst->numOfOutputCols);
memcpy(dst->pOffset, src->pOffset, sizeof(short) * dst->numOfOutputCols);
// memcpy(dst->pOffset, src->pOffset, sizeof(short) * dst->numOfOutputCols);
memcpy(dst->pVisibleCols, src->pVisibleCols, sizeof(bool) * dst->numOfOutputCols);
memcpy(dst->pSqlExpr, src->pSqlExpr, POINTER_BYTES * dst->numOfOutputCols);
}
TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index) {
......@@ -967,11 +1005,11 @@ TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index) {
int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutputCols; }
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
if (index >= pQueryInfo->fieldsInfo.numOfOutputCols) {
if (index >= pQueryInfo->exprsInfo.numOfExprs) {
return 0;
}
return pQueryInfo->fieldsInfo.pOffset[index];
return pQueryInfo->exprsInfo.pExprs[index].offset;
}
int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) {
......@@ -995,13 +1033,16 @@ int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2) {
}
int32_t tscGetResRowLength(SQueryInfo* pQueryInfo) {
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
if (pFieldInfo->numOfOutputCols <= 0) {
if (pQueryInfo->exprsInfo.numOfExprs <= 0) {
return 0;
}
return pFieldInfo->pOffset[pFieldInfo->numOfOutputCols - 1] +
pFieldInfo->pFields[pFieldInfo->numOfOutputCols - 1].bytes;
int32_t size = 0;
for(int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
size += pQueryInfo->exprsInfo.pExprs[i].resBytes;
}
return size;
}
void tscClearFieldInfo(SFieldInfo* pFieldInfo) {
......@@ -1009,10 +1050,20 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) {
return;
}
tfree(pFieldInfo->pOffset);
// tfree(pFieldInfo->pOffset);
tfree(pFieldInfo->pFields);
tfree(pFieldInfo->pVisibleCols);
tfree(pFieldInfo->pSqlExpr);
for(int32_t i = 0; i < pFieldInfo->numOfOutputCols; ++i) {
if (pFieldInfo->pExpr[i] != NULL) {
tSQLBinaryExprDestroy(&pFieldInfo->pExpr[i]->pBinExprInfo.pBinExpr, NULL);
tfree(pFieldInfo->pExpr[i]->pBinExprInfo.pReqColumns);
tfree(pFieldInfo->pExpr[i]);
}
}
tfree(pFieldInfo->pExpr);
memset(pFieldInfo, 0, sizeof(SFieldInfo));
}
......@@ -1123,6 +1174,10 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
return pExpr;
}
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
return pQueryInfo->exprsInfo.numOfExprs;
}
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) {
if (pExpr == NULL || argument == NULL || bytes == 0) {
return;
......
......@@ -417,6 +417,7 @@ typedef struct SColIndexEx {
int16_t colIdx;
int16_t colIdxInBuf;
uint16_t flag; // denote if it is a tag or not
char name[TSDB_COL_NAME_LEN];
} SColIndexEx;
/* sql function msg, to describe the message to vnode about sql function
......
......@@ -103,6 +103,8 @@ void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char
void tSQLBinaryExprTrv(tSQLBinaryExpr *pExprs, int32_t *val, int16_t *ids);
void tQueryResultClean(tQueryResultset *pRes);
uint8_t getBinaryExprOptr(SSQLToken *pToken);
#ifdef __cplusplus
}
#endif
......
......@@ -1056,7 +1056,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) {
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pMsg += sizeof(SSqlFuncExprMsg);
pMsg += (sizeof(SSqlFuncExprMsg) - TSDB_COL_NAME_LEN);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
......
......@@ -372,18 +372,22 @@ void vnodeUpdateFilterColumnIndex(SQuery* pQuery) {
}
// set the column index in buffer for arithmetic operation
if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlBinaryExprInfo* pBinExprInfo = &pQuery->pSelectExpr[i].pBinExprInfo;
if (pBinExprInfo->pBinExpr != NULL) {
for (int16_t j = 0; j < pBinExprInfo->numOfCols; ++j) {
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
if (pBinExprInfo->pReqColumns[j].colId == pQuery->colList[k].data.colId) {
pBinExprInfo->pReqColumns[j].colIdxInBuf = pQuery->colList[k].colIdxInBuf;
assert(pQuery->colList[k].colIdxInBuf == k);
break;
}
}
if (pQuery->pSelectExpr == NULL) {
return;
}
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlBinaryExprInfo* pBinExprInfo = &pQuery->pSelectExpr[i].pBinExprInfo;
if (pBinExprInfo->pBinExpr == NULL) {
continue;
}
for (int16_t j = 0; j < pBinExprInfo->numOfCols; ++j) {
for (int32_t k = 0; k < pQuery->numOfCols; ++k) {
if (pBinExprInfo->pReqColumns[j].colId == pQuery->colList[k].data.colId) {
pBinExprInfo->pReqColumns[j].colIdxInBuf = pQuery->colList[k].colIdxInBuf;
assert(pQuery->colList[k].colIdxInBuf == k);
break;
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册