提交 633e8a0a 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/linux

...@@ -33,6 +33,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index); ...@@ -33,6 +33,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index);
void tscHandleMasterJoinQuery(SSqlObj* pSql); void tscHandleMasterJoinQuery(SSqlObj* pSql);
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
......
...@@ -132,8 +132,9 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo); ...@@ -132,8 +132,9 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo); bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SSqlExpr* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType); SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql); int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql);
...@@ -174,6 +175,7 @@ SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIn ...@@ -174,6 +175,7 @@ SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIn
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size); int16_t size);
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
......
...@@ -224,7 +224,9 @@ typedef struct SQueryInfo { ...@@ -224,7 +224,9 @@ typedef struct SQueryInfo {
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not bool distinctTag; // distinct tag or not
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
} SQueryInfo; } SQueryInfo;
typedef struct { typedef struct {
...@@ -412,10 +414,9 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code); ...@@ -412,10 +414,9 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code);
int tscProcessLocalCmd(SSqlObj *pSql); int tscProcessLocalCmd(SSqlObj *pSql);
int tscCfgDynamicOptions(char *msg); int tscCfgDynamicOptions(char *msg);
int taos_retrieve(TAOS_RES *res);
int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); int32_t tscTansformFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
......
...@@ -68,7 +68,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr ...@@ -68,7 +68,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx->aOutputBuf = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity; pCtx->pOutput = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
pCtx->order = pQueryInfo->order.order; pCtx->order = pQueryInfo->order.order;
pCtx->functionId = pExpr->functionId; pCtx->functionId = pExpr->functionId;
...@@ -76,7 +76,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr ...@@ -76,7 +76,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i); int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i);
SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i); SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i);
pCtx->aInputElemBuf = pReducer->pTempBuffer->data + offset; pCtx->pInput = pReducer->pTempBuffer->data + offset;
// input data format comes from pModel // input data format comes from pModel
pCtx->inputType = pSchema->type; pCtx->inputType = pSchema->type;
...@@ -94,7 +94,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr ...@@ -94,7 +94,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
// for top/bottom function, the output of timestamp is the first column // for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->functionId; int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf; pCtx->ptsOutputBuf = pReducer->pCtx[0].pOutput;
pCtx->param[2].i64 = pQueryInfo->order.order; pCtx->param[2].i64 = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64 = pQueryInfo->order.orderColId; pCtx->param[1].i64 = pQueryInfo->order.orderColId;
...@@ -118,7 +118,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr ...@@ -118,7 +118,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) { if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExpr->resBytes; tagLen += pExpr->resBytes;
pTagCtx[n++] = &pReducer->pCtx[i]; pTagCtx[n++] = &pReducer->pCtx[i];
} else if ((aAggs[pExpr->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { } else if ((aAggs[pExpr->functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx = &pReducer->pCtx[i]; pCtx = &pReducer->pCtx[i];
} }
} }
...@@ -311,7 +311,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -311,7 +311,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx)); pReducer->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx));
pReducer->rowSize = pMemBuffer[0]->nElemSize; pReducer->rowSize = pMemBuffer[0]->nElemSize;
tscRestoreSQLFuncForSTableQuery(pQueryInfo); tscRestoreFuncForSTableQuery(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
if (pReducer->rowSize > pMemBuffer[0]->pageSize) { if (pReducer->rowSize > pMemBuffer[0]->pageSize) {
...@@ -383,7 +383,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde ...@@ -383,7 +383,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, pReducer->pFillInfo = taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
4096, (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit, 4096, (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit,
tinfo.precision, pQueryInfo->fillType, pFillCol, pSql); tinfo.precision, pQueryInfo->fillType, pFillCol, pSql);
} }
...@@ -720,7 +720,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -720,7 +720,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SSchema p1 = {0}; SSchema p1 = {0};
if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
p1 = tGetTableNameColumnSchema(); p1 = *tGetTbnameColumnSchema();
} else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { } else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
p1.bytes = pExpr->resBytes; p1.bytes = pExpr->resBytes;
p1.type = (uint8_t) pExpr->resType; p1.type = (uint8_t) pExpr->resType;
...@@ -744,6 +744,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -744,6 +744,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
functionId = TSDB_FUNC_FIRST; functionId = TSDB_FUNC_FIRST;
} else if (functionId == TSDB_FUNC_LAST_DST) { } else if (functionId == TSDB_FUNC_LAST_DST) {
functionId = TSDB_FUNC_LAST; functionId = TSDB_FUNC_LAST;
} else if (functionId == TSDB_FUNC_STDDEV_DST) {
functionId = TSDB_FUNC_STDDEV;
} }
int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false); int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false);
...@@ -1041,7 +1043,7 @@ static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { ...@@ -1041,7 +1043,7 @@ static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
pLocalMerge->hasPrevRow = true; pLocalMerge->hasPrevRow = true;
} }
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool needInit) { static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool needInit) {
// the tag columns need to be set before all functions execution // the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
...@@ -1053,7 +1055,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bo ...@@ -1053,7 +1055,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bo
int32_t functionId = pCtx->functionId; int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
tVariantDestroy(&pCtx->tag); tVariantDestroy(&pCtx->tag);
char* input = pCtx->aInputElemBuf; char* input = pCtx->pInput;
if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) { if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) {
assert(varDataLen(input) <= pCtx->inputBytes); assert(varDataLen(input) <= pCtx->inputBytes);
...@@ -1061,6 +1063,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bo ...@@ -1061,6 +1063,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bo
} else { } else {
tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType); tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
} }
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
pCtx->param[0].i64 = pExpr->param[0].i64; pCtx->param[0].i64 = pExpr->param[0].i64;
...@@ -1086,7 +1089,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bo ...@@ -1086,7 +1089,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bo
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
if (pLocalMerge->hasUnprocessedRow) { if (pLocalMerge->hasUnprocessedRow) {
pLocalMerge->hasUnprocessedRow = false; pLocalMerge->hasUnprocessedRow = false;
doExecuteSecondaryMerge(pCmd, pLocalMerge, true); doExecuteFinalMerge(pCmd, pLocalMerge, true);
savePreviousRow(pLocalMerge, tmpBuffer); savePreviousRow(pLocalMerge, tmpBuffer);
} }
} }
...@@ -1142,11 +1145,11 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo ...@@ -1142,11 +1145,11 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result
memset(buf, 0, (size_t)maxBufSize); memset(buf, 0, (size_t)maxBufSize);
memcpy(buf, pCtx->aOutputBuf, (size_t)pCtx->outputBytes); memcpy(buf, pCtx->pOutput, (size_t)pCtx->outputBytes);
for (int32_t i = 0; i < inc; ++i) { for (int32_t i = 0; i < inc; ++i) {
pCtx->aOutputBuf += pCtx->outputBytes; pCtx->pOutput += pCtx->outputBytes;
memcpy(pCtx->aOutputBuf, buf, (size_t)pCtx->outputBytes); memcpy(pCtx->pOutput, buf, (size_t)pCtx->outputBytes);
} }
} }
...@@ -1289,10 +1292,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset ...@@ -1289,10 +1292,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset
size_t t = tscSqlExprNumOfExprs(pQueryInfo); size_t t = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < t; ++i) { for (int32_t i = 0; i < t; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pLocalMerge->pCtx[i].aOutputBuf = pLocalMerge->pResultBuf->data + pExpr->offset * pLocalMerge->resColModel->capacity; pLocalMerge->pCtx[i].pOutput = pLocalMerge->pResultBuf->data + pExpr->offset * pLocalMerge->resColModel->capacity;
if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM || pExpr->functionId == TSDB_FUNC_DIFF) { if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM || pExpr->functionId == TSDB_FUNC_DIFF) {
pLocalMerge->pCtx[i].ptsOutputBuf = pLocalMerge->pCtx[0].aOutputBuf; pLocalMerge->pCtx[i].ptsOutputBuf = pLocalMerge->pCtx[0].pOutput;
} }
} }
...@@ -1404,7 +1407,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { ...@@ -1404,7 +1407,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
for (int32_t k = 0; k < size; ++k) { for (int32_t k = 0; k < size; ++k) {
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k]; SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
pCtx->aOutputBuf += pCtx->outputBytes * numOfRes; pCtx->pOutput += pCtx->outputBytes * numOfRes;
// set the correct output timestamp column position // set the correct output timestamp column position
if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) { if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) {
...@@ -1412,7 +1415,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) { ...@@ -1412,7 +1415,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
} }
} }
doExecuteSecondaryMerge(pCmd, pLocalMerge, true); doExecuteFinalMerge(pCmd, pLocalMerge, true);
} }
int32_t tscDoLocalMerge(SSqlObj *pSql) { int32_t tscDoLocalMerge(SSqlObj *pSql) {
...@@ -1504,7 +1507,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1504,7 +1507,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
if (pLocalMerge->hasPrevRow) { if (pLocalMerge->hasPrevRow) {
if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) { if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) {
// belong to the group of the previous row, continue process it // belong to the group of the previous row, continue process it
doExecuteSecondaryMerge(pCmd, pLocalMerge, false); doExecuteFinalMerge(pCmd, pLocalMerge, false);
// copy to buffer // copy to buffer
savePreviousRow(pLocalMerge, tmpBuffer); savePreviousRow(pLocalMerge, tmpBuffer);
...@@ -1576,7 +1579,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1576,7 +1579,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
} }
} }
} else { } else {
doExecuteSecondaryMerge(pCmd, pLocalMerge, true); doExecuteFinalMerge(pCmd, pLocalMerge, true);
savePreviousRow(pLocalMerge, tmpBuffer); // copy the processed row to buffer savePreviousRow(pLocalMerge, tmpBuffer); // copy the processed row to buffer
} }
......
...@@ -787,10 +787,10 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ ...@@ -787,10 +787,10 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ
} }
SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SSchema s = {.bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tstrncpy(s.name, aAggs[TSDB_FUNC_TS].aName, sizeof(s.name)); tstrncpy(s.name, aAggs[TSDB_FUNC_TS].name, sizeof(s.name));
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS, &index, &s, TSDB_COL_NORMAL);
if (parseOffsetClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { if (parseOffsetClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
...@@ -1319,7 +1319,7 @@ int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStr ...@@ -1319,7 +1319,7 @@ int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStr
return (totalLen < TSDB_TABLE_FNAME_LEN) ? TSDB_CODE_SUCCESS : TSDB_CODE_TSC_INVALID_SQL; return (totalLen < TSDB_TABLE_FNAME_LEN) ? TSDB_CODE_SUCCESS : TSDB_CODE_TSC_INVALID_SQL;
} }
static void tscInsertPrimaryTSSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &tsCol); tscColumnListInsert(pQueryInfo->colList, &tsCol);
} }
...@@ -1401,7 +1401,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t ...@@ -1401,7 +1401,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr); insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr);
// add ts column // add ts column
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
tbufCloseWriter(&bw); tbufCloseWriter(&bw);
taosArrayDestroy(colList); taosArrayDestroy(colList);
...@@ -1506,7 +1506,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) { ...@@ -1506,7 +1506,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
// add the timestamp column into the output columns // add the timestamp column into the output columns
SColumnIndex index = {0}; // primary timestamp column info SColumnIndex index = {0}; // primary timestamp column info
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL);
SInternalField* pSupInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, numOfCols); SInternalField* pSupInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, numOfCols);
pSupInfo->visible = false; pSupInfo->visible = false;
...@@ -1602,7 +1602,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1602,7 +1602,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
* in dealing with super table queries such as: count/first/last * in dealing with super table queries such as: count/first/last
*/ */
if (isSTable) { if (isSTable) {
tscTansformSQLFuncForSTableQuery(pQueryInfo); tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) { if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
...@@ -1656,7 +1656,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tabl ...@@ -1656,7 +1656,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tabl
(functionId == TSDB_FUNC_TAGPRJ)); (functionId == TSDB_FUNC_TAGPRJ));
} }
SSqlExpr* tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SSqlExpr* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) { SColumnIndex* pIndex, SSchema* pColSchema, int16_t flag) {
int16_t colId = getNewResColId(pQueryInfo); int16_t colId = getNewResColId(pQueryInfo);
...@@ -1738,7 +1738,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -1738,7 +1738,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
} }
// add the primary timestamp column even though it is not required by user // add the primary timestamp column even though it is not required by user
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
} else if (optr == TK_STRING || optr == TK_INTEGER || optr == TK_FLOAT) { // simple column projection query } else if (optr == TK_STRING || optr == TK_INTEGER || optr == TK_FLOAT) { // simple column projection query
SColumnIndex index = COLUMN_INDEX_INITIALIZER; SColumnIndex index = COLUMN_INDEX_INITIALIZER;
...@@ -1748,7 +1748,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -1748,7 +1748,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
SSchema colSchema = tGetUserSpecifiedColumnSchema(&pItem->pNode->val, &pItem->pNode->token, pItem->aliasName); SSchema colSchema = tGetUserSpecifiedColumnSchema(&pItem->pNode->val, &pItem->pNode->token, pItem->aliasName);
SSqlExpr* pExpr = SSqlExpr* pExpr =
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_UDC); tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_UDC);
// NOTE: the first parameter is reserved for the tag column id during join query process. // NOTE: the first parameter is reserved for the tag column id during join query process.
pExpr->numOfParams = 2; pExpr->numOfParams = 2;
...@@ -1761,11 +1761,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -1761,11 +1761,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
} }
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema colSchema = tGetTableNameColumnSchema(); SSchema* colSchema = tGetTbnameColumnSchema();
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG); tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, colSchema, TSDB_COL_TAG);
} else if (index.columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) { } else if (index.columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema colSchema = tGetBlockDistColumnSchema(); SSchema colSchema = tGetBlockDistColumnSchema();
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_TAG); tscAddFuncInSelectClause(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_TAG);
} else { } else {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
...@@ -1779,7 +1779,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -1779,7 +1779,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
} }
// add the primary timestamp column even though it is not required by user // add the primary timestamp column even though it is not required by user
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
} else { } else {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -1849,9 +1849,9 @@ void setResultColName(char* name, tSqlExprItem* pItem, int32_t functionId, SStrT ...@@ -1849,9 +1849,9 @@ void setResultColName(char* name, tSqlExprItem* pItem, int32_t functionId, SStrT
if (tsKeepOriginalColumnName) { // keep the original column name if (tsKeepOriginalColumnName) { // keep the original column name
tstrncpy(name, uname, TSDB_COL_NAME_LEN); tstrncpy(name, uname, TSDB_COL_NAME_LEN);
} else { } else {
int32_t size = TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].aName) + 2 + 1; int32_t size = TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].name) + 2 + 1;
char tmp[TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].aName) + 2 + 1] = {0}; char tmp[TSDB_COL_NAME_LEN + tListLen(aAggs[functionId].name) + 2 + 1] = {0};
snprintf(tmp, size, "%s(%s)", aAggs[functionId].aName, uname); snprintf(tmp, size, "%s(%s)", aAggs[functionId].name, uname);
tstrncpy(name, tmp, TSDB_COL_NAME_LEN); tstrncpy(name, tmp, TSDB_COL_NAME_LEN);
} }
...@@ -1966,7 +1966,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1966,7 +1966,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// the time stamp may be always needed // the time stamp may be always needed
if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2036,7 +2036,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2036,7 +2036,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
getNewResColId(pQueryInfo), TSDB_KEYSIZE, false); getNewResColId(pQueryInfo), TSDB_KEYSIZE, false);
SColumnList ids = getColumnList(1, 0, 0); SColumnList ids = getColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName, pExpr); insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
} }
// functions can not be applied to tags // functions can not be applied to tags
...@@ -2079,7 +2079,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2079,7 +2079,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} }
} }
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
case TK_FIRST: case TK_FIRST:
...@@ -2285,7 +2285,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2285,7 +2285,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (convertFunctionId(optr, &functionId) != TSDB_CODE_SUCCESS) { if (convertFunctionId(optr, &functionId) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
tscInsertPrimaryTSSourceColumn(pQueryInfo, &index); tscInsertPrimaryTsSourceColumn(pQueryInfo, &index);
colIndex += 1; // the first column is ts colIndex += 1; // the first column is ts
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false); pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false);
...@@ -2308,12 +2308,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2308,12 +2308,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SColumnIndex index1 = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index1 = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pQueryInfo), pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pQueryInfo),
TSDB_KEYSIZE, false); TSDB_KEYSIZE, false);
tstrncpy(pExpr->aliasName, aAggs[TSDB_FUNC_TS].aName, sizeof(pExpr->aliasName)); tstrncpy(pExpr->aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->aliasName));
const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX;
SColumnList ids = getColumnList(1, 0, TS_COLUMN_INDEX); SColumnList ids = getColumnList(1, 0, TS_COLUMN_INDEX);
insertResultField(pQueryInfo, TS_COLUMN_INDEX, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, insertResultField(pQueryInfo, TS_COLUMN_INDEX, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP,
aAggs[TSDB_FUNC_TS].aName, pExpr); aAggs[TSDB_FUNC_TS].name, pExpr);
colIndex += 1; // the first column is ts colIndex += 1; // the first column is ts
...@@ -2384,7 +2384,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2384,7 +2384,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SSchema s = {0}; SSchema s = {0};
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
s = tGetTableNameColumnSchema(); s = *tGetTbnameColumnSchema();
} else { } else {
s = pTagSchema[index.columnIndex]; s = pTagSchema[index.columnIndex];
} }
...@@ -2400,7 +2400,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2400,7 +2400,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
s.bytes = bytes; s.bytes = bytes;
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG); tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2778,7 +2778,7 @@ bool validateIpAddress(const char* ip, size_t size) { ...@@ -2778,7 +2778,7 @@ bool validateIpAddress(const char* ip, size_t size) {
return epAddr != INADDR_NONE; return epAddr != INADDR_NONE;
} }
int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (pTableMetaInfo->pTableMeta == NULL || !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
...@@ -2800,7 +2800,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -2800,7 +2800,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
SSchema* pSrcSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, colIndex); SSchema* pSrcSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, colIndex);
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) || (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->param[0].i64, &type, &bytes, if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->param[0].i64, &type, &bytes,
&interBytes, 0, true) != TSDB_CODE_SUCCESS) { &interBytes, 0, true) != TSDB_CODE_SUCCESS) {
...@@ -2818,7 +2818,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -2818,7 +2818,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
} }
/* transfer the field-info back to original input format */ /* transfer the field-info back to original input format */
void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return; return;
...@@ -2842,6 +2842,8 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -2842,6 +2842,8 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
functionId = TSDB_FUNC_FIRST; functionId = TSDB_FUNC_FIRST;
} else if (functionId == TSDB_FUNC_LAST_DST) { } else if (functionId == TSDB_FUNC_LAST_DST) {
functionId = TSDB_FUNC_LAST; functionId = TSDB_FUNC_LAST;
} else if (functionId == TSDB_FUNC_STDDEV_DST) {
functionId = TSDB_FUNC_STDDEV;
} }
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &pExpr->resType, &pExpr->resBytes, getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &pExpr->resType, &pExpr->resBytes,
...@@ -2858,7 +2860,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) ...@@ -2858,7 +2860,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_STABLE) == 0) { if ((aAggs[functionId].status & TSDB_FUNCSTATE_STABLE) == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
return true; return true;
} }
...@@ -2968,7 +2970,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd) ...@@ -2968,7 +2970,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SSchema* pSchema = NULL; SSchema* pSchema = NULL;
SSchema s = tGetTbnameColumnSchema(); // SSchema s = tGetTbnameColumnSchema();
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL; int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
...@@ -2995,7 +2997,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd) ...@@ -2995,7 +2997,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
int32_t numOfCols = tscGetNumOfColumns(pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pSchema = &s; pSchema = tGetTbnameColumnSchema();
} else { } else {
pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex); pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
} }
...@@ -3552,38 +3554,6 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr* ...@@ -3552,38 +3554,6 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSQLExpr*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo error handle / such as and /or mixed with +/-/*/
int32_t doArithmeticExprToString(tSQLExpr* pExpr, char** exprString) {
tSQLExpr* pLeft = pExpr->pLeft;
tSQLExpr* pRight = pExpr->pRight;
*(*exprString)++ = '(';
if (pLeft->nSQLOptr >= TK_PLUS && pLeft->nSQLOptr <= TK_REM) {
doArithmeticExprToString(pLeft, exprString);
} else {
int32_t ret = tSQLExprNodeToString(pLeft, exprString);
if (ret != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
optrToString(pExpr, exprString);
if (pRight->nSQLOptr >= TK_PLUS && pRight->nSQLOptr <= TK_REM) {
doArithmeticExprToString(pRight, exprString);
} else {
int32_t ret = tSQLExprNodeToString(pRight, exprString);
if (ret != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
*(*exprString)++ = ')';
return TSDB_CODE_SUCCESS;
}
static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList, static int32_t validateSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList,
int32_t* type, uint64_t* uid) { int32_t* type, uint64_t* uid) {
if (pExpr->nSQLOptr == TK_ID) { if (pExpr->nSQLOptr == TK_ID) {
...@@ -5233,7 +5203,7 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -5233,7 +5203,7 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
size_t size = taosArrayGetSize(pQueryInfo->exprList); size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId; int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (!IS_STREAM_QUERY_VALID(aAggs[functId].nStatus)) { if (!IS_STREAM_QUERY_VALID(aAggs[functId].status)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
} }
...@@ -5256,7 +5226,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu ...@@ -5256,7 +5226,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
bool hasSelectivity = false; bool hasSelectivity = false;
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
SSqlExpr* pEx = tscSqlExprGet(pQueryInfo, j); SSqlExpr* pEx = tscSqlExprGet(pQueryInfo, j);
if ((aAggs[pEx->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == TSDB_FUNCSTATE_SELECTIVITY) { if ((aAggs[pEx->functionId].status & TSDB_FUNCSTATE_SELECTIVITY) == TSDB_FUNCSTATE_SELECTIVITY) {
hasSelectivity = true; hasSelectivity = true;
break; break;
} }
...@@ -5710,7 +5680,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { ...@@ -5710,7 +5680,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->colIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->colIndex);
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pColIndex->colIndex}; SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pColIndex->colIndex};
tscAddSpecialColumnForSelect(pQueryInfo, (int32_t)size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pQueryInfo, (int32_t)size, TSDB_FUNC_PRJ, &colIndex, pSchema, TSDB_COL_NORMAL);
int32_t numOfFields = tscNumOfFields(pQueryInfo); int32_t numOfFields = tscNumOfFields(pQueryInfo);
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, numOfFields - 1); SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, numOfFields - 1);
...@@ -5874,7 +5844,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) ...@@ -5874,7 +5844,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
continue; continue;
} }
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++; numOfSelectivity++;
} else { } else {
numOfAggregation++; numOfAggregation++;
...@@ -5906,7 +5876,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) ...@@ -5906,7 +5876,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int16_t functionId = pExpr->functionId; int16_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || (aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == 0) { if (functionId == TSDB_FUNC_TAGPRJ || (aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) == 0) {
continue; continue;
} }
...@@ -5949,7 +5919,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo ...@@ -5949,7 +5919,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema s = tGetTableNameColumnSchema(); SSchema s = *tGetTbnameColumnSchema();
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
int16_t bytes = 0; int16_t bytes = 0;
int16_t type = 0; int16_t type = 0;
...@@ -6093,7 +6063,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -6093,7 +6063,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
} }
} }
if (IS_MULTIOUTPUT(aAggs[functId].nStatus) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM && if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) { functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
...@@ -6283,7 +6253,7 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) { ...@@ -6283,7 +6253,7 @@ void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) {
char tmpBuf[1024] = {0}; char tmpBuf[1024] = {0};
int32_t tmpLen = 0; int32_t tmpLen = 0;
tmpLen = tmpLen =
sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].aName, pExpr->uid, pExpr->colInfo.colId); sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].name, pExpr->uid, pExpr->colInfo.colId);
if (tmpLen + offset >= totalBufSize - 1) break; if (tmpLen + offset >= totalBufSize - 1) break;
...@@ -6984,3 +6954,4 @@ bool hasNormalColumnFilter(SQueryInfo* pQueryInfo) { ...@@ -6984,3 +6954,4 @@ bool hasNormalColumnFilter(SQueryInfo* pQueryInfo) {
return false; return false;
} }
...@@ -752,6 +752,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -752,6 +752,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
...@@ -989,6 +990,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -989,6 +990,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
} }
if (pQueryInfo->bufLen > 0) {
memcpy(pMsg, pQueryInfo->buf, pQueryInfo->bufLen);
pMsg += pQueryInfo->bufLen;
}
SCond* pCond = &pQueryInfo->tagCond.tbnameCond; SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
if (pCond->len > 0) { if (pCond->len > 0) {
strncpy(pMsg, pCond->cond, pCond->len); strncpy(pMsg, pCond->cond, pCond->len);
......
...@@ -443,24 +443,6 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -443,24 +443,6 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return pFieldInfo->final; return pFieldInfo->final;
} }
int taos_retrieve(TAOS_RES *res) {
if (res == NULL) return 0;
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) return 0;
if (pRes->qhandle == 0) return 0;
tscResetForNextRetrieve(pRes);
if (pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
return pRes->numOfRows;
}
static bool needToFetchNewBlock(SSqlObj* pSql) { static bool needToFetchNewBlock(SSqlObj* pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
......
...@@ -103,7 +103,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { ...@@ -103,7 +103,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
// failed to get table Meta or vgroup list, retry in 10sec. // failed to get table Meta or vgroup list, retry in 10sec.
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tscTansformSQLFuncForSTableQuery(pQueryInfo); tscTansformFuncForSTableQuery(pQueryInfo);
tscDebug("%p stream:%p, start stream query on:%s", pSql, pStream, tNameGetTableName(&pTableMetaInfo->name)); tscDebug("%p stream:%p, start stream query on:%s", pSql, pStream, tNameGetTableName(&pTableMetaInfo->name));
pSql->fp = tscProcessStreamQueryCallback; pSql->fp = tscProcessStreamQueryCallback;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _GNU_SOURCE #define _GNU_SOURCE
#include "os.h" #include "os.h"
#include "texpr.h" #include "texpr.h"
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
#include "qUtil.h"
typedef struct SInsertSupporter { typedef struct SInsertSupporter {
SSqlObj* pSql; SSqlObj* pSql;
...@@ -501,7 +502,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -501,7 +502,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS; int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
...@@ -681,7 +682,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr ...@@ -681,7 +682,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
} }
} }
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
tscClearSubqueryInfo(pCmd); tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
...@@ -701,7 +702,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* ...@@ -701,7 +702,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function // set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
...@@ -970,7 +971,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -970,7 +971,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj* sub = pParentSql->pSubs[m]; SSqlObj* sub = pParentSql->pSubs[m];
issueTSCompQuery(sub, sub->param, pParentSql); issueTsCompQuery(sub, sub->param, pParentSql);
} }
} }
...@@ -1470,7 +1471,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1470,7 +1471,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
} }
// restore the offset value for super table query in case of final result. // restore the offset value for super table query in case of final result.
tscRestoreSQLFuncForSTableQuery(pQueryInfo); tscRestoreFuncForSTableQuery(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
} }
...@@ -1651,7 +1652,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1651,7 +1652,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// set get tags query type // set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG); tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug( tscDebug(
...@@ -1662,7 +1663,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1662,7 +1663,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
} else { } else {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function // set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
...@@ -1821,7 +1822,262 @@ void tscUnlockByThread(int64_t *lockedBy) { ...@@ -1821,7 +1822,262 @@ void tscUnlockByThread(int64_t *lockedBy) {
} }
} }
typedef struct SFirstRoundQuerySup {
SSqlObj *pParent;
int32_t numOfRows;
SArray *pColsInfo;
int32_t tagLen;
STColumn *pTagCols;
SArray *pResult; // SArray<SInterResult>
int64_t interval;
char* buf;
int32_t bufLen;
} SFirstRoundQuerySup;
void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, SQueryInfo* pQueryInfo) {
TSKEY key = INT64_MIN;
for(int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
continue;
}
if (pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
key = *(TSKEY*) row[i];
continue;
}
double v = 0;
if (row[i] != NULL) {
v = *(double*) row[i];
} else {
SET_DOUBLE_NULL(&v);
}
int32_t id = pExpr->colInfo.colId;
int32_t numOfQueriedCols = (int32_t) taosArrayGetSize(pInterResult->pResult);
SArray* p = NULL;
for(int32_t j = 0; j < numOfQueriedCols; ++j) {
SStddevInterResult* pColRes = taosArrayGet(pInterResult->pResult, j);
if (pColRes->colId == id) {
p = pColRes->pResult;
break;
}
}
//append a new column
if (p == NULL) {
SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),};
taosArrayPush(pInterResult->pResult, &t);
p = t.pResult;
}
SResPair pair = {.avg = v, .key = key};
taosArrayPush(p, &pair);
}
}
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*)tres;
SSqlRes* pRes = &pSql->res;
SFirstRoundQuerySup* pSup = param;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (numOfRows > 0) {
TAOS_ROW row = NULL;
int32_t numOfCols = taos_field_count(tres);
if (pSup->tagLen == 0) { // no tags, all rows belong to one group
SInterResult interResult = {.tags = NULL, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
taosArrayPush(pSup->pResult, &interResult);
while ((row = taos_fetch_row(tres)) != NULL) {
doAppendData(&interResult, row, numOfCols, pQueryInfo);
}
} else { // tagLen > 0
char* p = calloc(1, pSup->tagLen);
while ((row = taos_fetch_row(tres)) != NULL) {
int32_t* length = taos_fetch_lengths(tres);
memset(p, 0, pSup->tagLen);
int32_t offset = 0;
for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
memcpy(p + offset, row[i], length[i]);
offset += pExpr->resBytes;
}
}
assert(offset == pSup->tagLen);
size_t size = taosArrayGetSize(pSup->pResult);
if (size > 0) {
SInterResult* pInterResult = taosArrayGetLast(pSup->pResult);
if (memcmp(pInterResult->tags, p, pSup->tagLen) == 0) { // belongs to the same group
doAppendData(pInterResult, row, numOfCols, pQueryInfo);
} else {
char* tags = malloc( pSup->tagLen);
memcpy(tags, p, pSup->tagLen);
SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
taosArrayPush(pSup->pResult, &interResult);
doAppendData(&interResult, row, numOfCols, pQueryInfo);
}
} else {
char* tags = malloc(pSup->tagLen);
memcpy(tags, p, pSup->tagLen);
SInterResult interResult = {.tags = tags, .pResult = taosArrayInit(4, sizeof(SStddevInterResult))};
taosArrayPush(pSup->pResult, &interResult);
doAppendData(&interResult, row, numOfCols, pQueryInfo);
}
}
tfree(p);
}
}
pSup->numOfRows += numOfRows;
if (!pRes->completed) {
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
return;
}
// set the parameters for the second round query process
SSqlObj *pParent = pSup->pParent;
SSqlCmd *pPCmd = &pParent->cmd;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);
if (pSup->numOfRows > 0) {
SBufferWriter bw = tbufInitWriter(NULL, false);
interResToBinary(&bw, pSup->pResult, pSup->tagLen);
pQueryInfo1->bufLen = (int32_t) tbufTell(&bw);
pQueryInfo1->buf = tbufGetData(&bw, true);
// set the serialized binary string as the parameter of arithmetic expression
tbufCloseWriter(&bw);
}
taosArrayDestroyEx(pSup->pResult, freeInterResult);
taosArrayDestroy(pSup->pColsInfo);
tfree(pSup);
taos_free_result(pSql);
pQueryInfo1->round = 1;
tscDoQuery(pParent);
}
void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
int32_t c = taos_errno(tres);
if (c != TSDB_CODE_SUCCESS) {
// TODO HANDLE ERROR
}
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
}
int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SFirstRoundQuerySup *pSup = calloc(1, sizeof(SFirstRoundQuerySup));
pSup->pParent = pSql;
pSup->interval = pQueryInfo->interval.interval;
pSup->pResult = taosArrayInit(6, sizeof(SStddevInterResult));
pSup->pColsInfo = taosArrayInit(6, sizeof(int16_t)); // result column id
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL);
SSqlCmd *pCmd = &pNew->cmd;
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
assert(pQueryInfo->numOfTables == 1);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
tscInitQueryInfo(pNewQueryInfo);
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
// goto _error;
}
}
if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
// goto _error;
}
pNewQueryInfo->interval = pQueryInfo->interval;
pCmd->command = TSDB_SQL_SELECT;
pNew->fp = tscFirstRoundCallback;
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
int32_t index = 0;
int32_t numOfTags = 0;
for(int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
taosArrayPush(pSup->pColsInfo, &pExpr->resColId);
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TS, &colIndex, schema, TSDB_COL_NORMAL);
p->resColId = pExpr->resColId; // update the result column id
} else if (pExpr->functionId == TSDB_FUNC_STDDEV_DST) {
taosArrayPush(pSup->pColsInfo, &pExpr->resColId);
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};
SSchema schema = {.type = TSDB_DATA_TYPE_DOUBLE, .bytes = sizeof(double)};
tstrncpy(schema.name, pExpr->aliasName, tListLen(schema.name));
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_AVG, &colIndex, &schema, TSDB_COL_NORMAL);
p->resColId = pExpr->resColId; // update the result column id
} else if (pExpr->functionId == TSDB_FUNC_TAG) {
pSup->tagLen += pExpr->resBytes;
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pExpr->colInfo.colIndex};
SSchema* schema = NULL;
if (pExpr->colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) {
schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
} else {
schema = tGetTbnameColumnSchema();
}
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
p->resColId = pExpr->resColId;
numOfTags += 1;
}
}
SColumnIndex columnIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscInsertPrimaryTsSourceColumn(pNewQueryInfo, &columnIndex);
tscTansformFuncForSTableQuery(pNewQueryInfo);
tscDebug(
"%p first round subquery:%p tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, query to retrieve timestamps, "
"numOfExpr:%" PRIzu ", colList:%d, numOfOutputFields:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
tscSqlExprNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
tscHandleMasterSTableQuery(pNew);
return TSDB_CODE_SUCCESS;
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -1833,7 +2089,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1833,7 +2089,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
tExtMemBuffer ** pMemoryBuf = NULL; tExtMemBuffer **pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL; tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL; SColumnModel *pModel = NULL;
SColumnModel *pFinalModel = NULL; SColumnModel *pFinalModel = NULL;
...@@ -1863,10 +2119,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1863,10 +2119,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret; return ret;
} }
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub); tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) { if (pSql->pSubs == NULL) {
tfree(pSql->pSubs); tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -2739,7 +2993,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -2739,7 +2993,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
return; return;
} }
tscRestoreSQLFuncForSTableQuery(pQueryInfo); tscRestoreFuncForSTableQuery(pQueryInfo);
} }
assert (pRes->row >= pRes->numOfRows); assert (pRes->row >= pRes->numOfRows);
......
...@@ -107,11 +107,6 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -107,11 +107,6 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return false; return false;
} }
// for select query super table, the super table vgroup list can not be null in any cases.
// if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// assert(pTableMetaInfo->vgroupList != NULL);
// }
if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
return false; return false;
} }
...@@ -1074,7 +1069,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -1074,7 +1069,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
memset(pFieldInfo, 0, sizeof(SFieldInfo)); memset(pFieldInfo, 0, sizeof(SFieldInfo));
} }
static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, static SSqlExpr* doCreateSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType) { int16_t size, int16_t resColId, int16_t interSize, int32_t colType) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
...@@ -1127,14 +1122,14 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi ...@@ -1127,14 +1122,14 @@ SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
} }
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); SSqlExpr* pExpr = doCreateSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayInsert(pQueryInfo->exprList, index, &pExpr); taosArrayInsert(pQueryInfo->exprList, index, &pExpr);
return pExpr; return pExpr;
} }
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) { int16_t size, int16_t resColId, int16_t interSize, bool isTagCol) {
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol); SSqlExpr* pExpr = doCreateSqlExpr(pQueryInfo, functionId, pColIndex, type, size, resColId, interSize, isTagCol);
taosArrayPush(pQueryInfo->exprList, &pExpr); taosArrayPush(pQueryInfo->exprList, &pExpr);
return pExpr; return pExpr;
} }
...@@ -1158,6 +1153,22 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi ...@@ -1158,6 +1153,22 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
return pExpr; return pExpr;
} }
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t index) {
if (!UTIL_TABLE_IS_SUPER_TABLE(pQueryInfo->pTableMetaInfo[index])) {
return false;
}
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_STDDEV_DST) {
return true;
}
}
return false;
}
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) { size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprList); return taosArrayGetSize(pQueryInfo->exprList);
} }
...@@ -1762,6 +1773,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { ...@@ -1762,6 +1773,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf); pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
tfree(pQueryInfo->fillVal); tfree(pQueryInfo->fillVal);
tfree(pQueryInfo->buf);
} }
void tscClearSubqueryInfo(SSqlCmd* pCmd) { void tscClearSubqueryInfo(SSqlCmd* pCmd) {
...@@ -2029,7 +2041,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -2029,7 +2041,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->signature = pNew; pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
SSqlCmd* pnCmd = &pNew->cmd; SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
pnCmd->command = cmd; pnCmd->command = cmd;
...@@ -2068,7 +2080,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -2068,7 +2080,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
pNewQueryInfo->numOfTables = 0; pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
if (pNewQueryInfo->buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
if (pQueryInfo->bufLen > 0) {
memcpy(pNewQueryInfo->buf, pQueryInfo->buf, pQueryInfo->bufLen);
}
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr; pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
if (pQueryInfo->groupbyExpr.columnInfo != NULL) { if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo); pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
...@@ -2234,6 +2257,9 @@ void tscDoQuery(SSqlObj* pSql) { ...@@ -2234,6 +2257,9 @@ void tscDoQuery(SSqlObj* pSql) {
} }
} }
return;
} else if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0) {
tscHandleFirstRoundStableQuery(pSql); // todo lock?
return; return;
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock); tscLockByThread(&pSql->squeryLock);
......
...@@ -36,6 +36,11 @@ typedef struct SColumnInfoData { ...@@ -36,6 +36,11 @@ typedef struct SColumnInfoData {
void* pData; // the corresponding block data in memory void* pData; // the corresponding block data in memory
} SColumnInfoData; } SColumnInfoData;
typedef struct SResPair {
TSKEY key;
double avg;
} SResPair;
#define TSDB_DB_NAME_T 1 #define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2 #define TSDB_TABLE_NAME_T 2
...@@ -58,7 +63,7 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len); ...@@ -58,7 +63,7 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable); void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
SSchema tGetTableNameColumnSchema(); //SSchema tGetTbnameColumnSchema();
SSchema tGetBlockDistColumnSchema(); SSchema tGetBlockDistColumnSchema();
...@@ -68,7 +73,7 @@ bool tscValidateTableNameLength(size_t len); ...@@ -68,7 +73,7 @@ bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters); SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters);
SSchema tGetTbnameColumnSchema(); SSchema* tGetTbnameColumnSchema();
/** /**
* check if the schema is valid or not, including following aspects: * check if the schema is valid or not, including following aspects:
......
...@@ -407,7 +407,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { ...@@ -407,7 +407,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
SSchema* pSchema = exception_calloc(1, sizeof(SSchema)); SSchema* pSchema = exception_calloc(1, sizeof(SSchema));
left->pSchema = pSchema; left->pSchema = pSchema;
*pSchema = tGetTbnameColumnSchema(); *pSchema = *tGetTbnameColumnSchema();
tExprNode* right = exception_calloc(1, sizeof(tExprNode)); tExprNode* right = exception_calloc(1, sizeof(tExprNode));
expr->_node.pRight = right; expr->_node.pRight = right;
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) #define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
//TODO remove it
void extractTableName(const char* tableId, char* name) { void extractTableName(const char* tableId, char* name) {
size_t s1 = strcspn(tableId, &TS_PATH_DELIMITER[0]); size_t s1 = strcspn(tableId, &TS_PATH_DELIMITER[0]);
size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]); size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]);
...@@ -24,6 +25,7 @@ char* extractDBName(const char* tableId, char* name) { ...@@ -24,6 +25,7 @@ char* extractDBName(const char* tableId, char* name) {
return strncpy(name, &tableId[offset1 + 1], len); return strncpy(name, &tableId[offset1 + 1], len);
} }
// todo remove it
size_t tableIdPrefix(const char* name, char* prefix, int32_t len) { size_t tableIdPrefix(const char* name, char* prefix, int32_t len) {
tstrncpy(prefix, name, len); tstrncpy(prefix, name, len);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
...@@ -31,14 +33,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len) { ...@@ -31,14 +33,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len) {
return strlen(prefix); return strlen(prefix);
} }
SSchema tGetTableNameColumnSchema() {
SSchema s = {0};
s.bytes = TSDB_TABLE_NAME_LEN - 1 + VARSTR_HEADER_SIZE;
s.type = TSDB_DATA_TYPE_BINARY;
s.colId = TSDB_TBNAME_COLUMN_INDEX;
tstrncpy(s.name, TSQL_TBNAME_L, TSDB_COL_NAME_LEN);
return s;
}
SSchema tGetBlockDistColumnSchema() { SSchema tGetBlockDistColumnSchema() {
SSchema s = {0}; SSchema s = {0};
s.bytes = TSDB_MAX_BINARY_LEN;; s.bytes = TSDB_MAX_BINARY_LEN;;
...@@ -189,15 +183,15 @@ void extractTableNameFromToken(SStrToken* pToken, SStrToken* pTable) { ...@@ -189,15 +183,15 @@ void extractTableNameFromToken(SStrToken* pToken, SStrToken* pTable) {
} }
} }
SSchema tGetTbnameColumnSchema() { static struct SSchema _s = {
struct SSchema s = { .colId = TSDB_TBNAME_COLUMN_INDEX,
.colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY,
.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
.bytes = TSDB_TABLE_NAME_LEN .name = TSQL_TBNAME_L,
}; };
strcpy(s.name, TSQL_TBNAME_L); SSchema* tGetTbnameColumnSchema() {
return s; return &_s;
} }
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) { static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
......
...@@ -86,43 +86,53 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32 ...@@ -86,43 +86,53 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
switch (type) { switch (type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->i64 = GET_INT8_VAL(pz); pVar->i64 = GET_INT8_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_UTINYINT: { case TSDB_DATA_TYPE_UTINYINT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->u64 = GET_UINT8_VAL(pz); pVar->u64 = GET_UINT8_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_SMALLINT: { case TSDB_DATA_TYPE_SMALLINT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->i64 = GET_INT16_VAL(pz); pVar->i64 = GET_INT16_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_USMALLINT: { case TSDB_DATA_TYPE_USMALLINT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->u64 = GET_UINT16_VAL(pz); pVar->u64 = GET_UINT16_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->i64 = GET_INT32_VAL(pz); pVar->i64 = GET_INT32_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_UINT: { case TSDB_DATA_TYPE_UINT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->u64 = GET_UINT32_VAL(pz); pVar->u64 = GET_UINT32_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: { case TSDB_DATA_TYPE_TIMESTAMP: {
pVar->nLen = tDataTypes[type].bytes;
pVar->i64 = GET_INT64_VAL(pz); pVar->i64 = GET_INT64_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_UBIGINT: { case TSDB_DATA_TYPE_UBIGINT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->u64 = GET_UINT64_VAL(pz); pVar->u64 = GET_UINT64_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
pVar->nLen = tDataTypes[type].bytes;
pVar->dKey = GET_DOUBLE_VAL(pz); pVar->dKey = GET_DOUBLE_VAL(pz);
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
pVar->nLen = tDataTypes[type].bytes;
pVar->dKey = GET_FLOAT_VAL(pz); pVar->dKey = GET_FLOAT_VAL(pz);
break; break;
} }
...@@ -144,6 +154,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32 ...@@ -144,6 +154,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
default: default:
pVar->i64 = GET_INT32_VAL(pz); pVar->i64 = GET_INT32_VAL(pz);
pVar->nLen = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
} }
pVar->nType = type; pVar->nType = type;
......
...@@ -19,12 +19,12 @@ import java.util.Map; ...@@ -19,12 +19,12 @@ import java.util.Map;
public abstract class TSDBConstants { public abstract class TSDBConstants {
public static final String STATEMENT_CLOSED = "Statement already closed."; public static final String STATEMENT_CLOSED = "statement is closed";
public static final String DEFAULT_PORT = "6200";
public static final String UNSUPPORT_METHOD_EXCEPTIONZ_MSG = "this operation is NOT supported currently!"; public static final String UNSUPPORT_METHOD_EXCEPTIONZ_MSG = "this operation is NOT supported currently!";
public static final String INVALID_VARIABLES = "invalid variables"; public static final String INVALID_VARIABLES = "invalid variables";
public static final String RESULT_SET_IS_CLOSED = "resultSet is closed."; public static final String RESULT_SET_IS_CLOSED = "resultSet is closed";
public static final String DEFAULT_PORT = "6200";
public static Map<Integer, String> DATATYPE_MAP = null; public static Map<Integer, String> DATATYPE_MAP = null;
public static final long JNI_NULL_POINTER = 0L; public static final long JNI_NULL_POINTER = 0L;
......
package com.taosdata.jdbc;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class TSDBError {
private static Map<Integer, String> TSDBErrorMap = new HashMap<>();
static {
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED, "connection already closed");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD, "this operation is NOT supported currently!");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_INVALID_VARIABLE, "invalid variables");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED, "statement is closed");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED, "resultSet is closed");
/**************************************************/
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_SUBSCRIBE_FAILED, "failed to create subscription");
}
public static String wrapErrMsg(String msg) {
return "TDengine Error: " + msg;
}
public static SQLException createSQLException(int errorNumber) {
// JDBC exception code is less than 0x2350
if (errorNumber <= 0x2350)
return new SQLException(TSDBErrorMap.get(errorNumber));
// JNI exception code is
return new SQLException(wrapErrMsg(TSDBErrorMap.get(errorNumber)));
}
}
package com.taosdata.jdbc;
public class TSDBErrorNumbers {
public static final int ERROR_CONNECTION_CLOSED = 0x2301; // connection already closed
public static final int ERROR_UNSUPPORTED_METHOD = 0x2302; //this operation is NOT supported currently!
public static final int ERROR_INVALID_VARIABLE = 0x2303; //invalid variables
public static final int ERROR_STATEMENT_CLOSED = 0x2304; //statement already closed
public static final int ERROR_RESULTSET_CLOSED = 0x2305; //resultSet is closed
public static final int ERROR_SUBSCRIBE_FAILED = 0x2350; //failed to create subscription
private TSDBErrorNumbers() {
}
}
...@@ -22,7 +22,6 @@ import java.util.List; ...@@ -22,7 +22,6 @@ import java.util.List;
public class TSDBStatement implements Statement { public class TSDBStatement implements Statement {
private TSDBJNIConnector connector; private TSDBJNIConnector connector;
private TaosInfo taosInfo = TaosInfo.getInstance();
/** /**
* To store batched commands * To store batched commands
...@@ -69,13 +68,12 @@ public class TSDBStatement implements Statement { ...@@ -69,13 +68,12 @@ public class TSDBStatement implements Statement {
} }
public ResultSet executeQuery(String sql) throws SQLException { public ResultSet executeQuery(String sql) throws SQLException {
if (isClosed) { if (isClosed()) {
throw new SQLException("Invalid method call on a closed statement."); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
} }
// TODO make sure it is not a update query // TODO make sure it is not a update query
pSql = this.connector.executeQuery(sql); pSql = this.connector.executeQuery(sql);
long resultSetPointer = this.connector.getResultSet(); long resultSetPointer = this.connector.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) { if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connector.freeResultSet(pSql); this.connector.freeResultSet(pSql);
...@@ -100,8 +98,8 @@ public class TSDBStatement implements Statement { ...@@ -100,8 +98,8 @@ public class TSDBStatement implements Statement {
} }
public int executeUpdate(String sql) throws SQLException { public int executeUpdate(String sql) throws SQLException {
if (isClosed) { if (isClosed()) {
throw new SQLException("Invalid method call on a closed statement."); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
} }
// TODO check if current query is update query // TODO check if current query is update query
...@@ -133,25 +131,33 @@ public class TSDBStatement implements Statement { ...@@ -133,25 +131,33 @@ public class TSDBStatement implements Statement {
} }
public int getMaxFieldSize() throws SQLException { public int getMaxFieldSize() throws SQLException {
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
}
return 0; return 0;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
public void setMaxFieldSize(int max) throws SQLException { public void setMaxFieldSize(int max) throws SQLException {
if (isClosed()) if (isClosed()) {
throw new SQLException(TSDBConstants.STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
}
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG); throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
public int getMaxRows() throws SQLException { public int getMaxRows() throws SQLException {
if (isClosed()) if (isClosed()) {
throw new SQLException(TSDBConstants.STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
}
// always set maxRows to zero, meaning unlimitted rows in a resultSet // always set maxRows to zero, meaning unlimitted rows in a resultSet
return 0; return 0;
} }
public void setMaxRows(int max) throws SQLException { public void setMaxRows(int max) throws SQLException {
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
}
// always set maxRows to zero, meaning unlimited rows in a resultSet // always set maxRows to zero, meaning unlimited rows in a resultSet
} }
......
...@@ -496,6 +496,7 @@ typedef struct { ...@@ -496,6 +496,7 @@ typedef struct {
int32_t tsOrder; // ts comp block order int32_t tsOrder; // ts comp block order
int32_t numOfTags; // number of tags columns involved int32_t numOfTags; // number of tags columns involved
int32_t sqlstrLen; // sql query string int32_t sqlstrLen; // sql query string
int32_t prevResultLen; // previous result length
SColumnInfo colList[]; SColumnInfo colList[];
} SQueryTableMsg; } SQueryTableMsg;
......
...@@ -1461,9 +1461,9 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, ...@@ -1461,9 +1461,9 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
SSchema tbnameSchema = tGetTableNameColumnSchema(); SSchema* tbnameSchema = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbnameSchema.bytes; pShow->bytes[cols] = tbnameSchema->bytes;
pSchema[cols].type = tbnameSchema.type; pSchema[cols].type = tbnameSchema->type;
strcpy(pSchema[cols].name, "name"); strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -2821,9 +2821,9 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -2821,9 +2821,9 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
SSchema s = tGetTableNameColumnSchema(); SSchema* s = tGetTbnameColumnSchema();
pShow->bytes[cols] = s.bytes; pShow->bytes[cols] = s->bytes;
pSchema[cols].type = s.type; pSchema[cols].type = s->type;
strcpy(pSchema[cols].name, "table_name"); strcpy(pSchema[cols].name, "table_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -2840,9 +2840,9 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -2840,9 +2840,9 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
SSchema tbCol = tGetTableNameColumnSchema(); SSchema* tbCol = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbCol.bytes + VARSTR_HEADER_SIZE; pShow->bytes[cols] = tbCol->bytes + VARSTR_HEADER_SIZE;
pSchema[cols].type = tbCol.type; pSchema[cols].type = tbCol->type;
strcpy(pSchema[cols].name, "stable_name"); strcpy(pSchema[cols].name, "stable_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
...@@ -3076,9 +3076,9 @@ static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, vo ...@@ -3076,9 +3076,9 @@ static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, vo
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
SSchema tbnameColSchema = tGetTableNameColumnSchema(); SSchema* tbnameColSchema = tGetTbnameColumnSchema();
pShow->bytes[cols] = tbnameColSchema.bytes; pShow->bytes[cols] = tbnameColSchema->bytes;
pSchema[cols].type = tbnameColSchema.type; pSchema[cols].type = tbnameColSchema->type;
strcpy(pSchema[cols].name, "table_name"); strcpy(pSchema[cols].name, "table_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
......
...@@ -59,25 +59,27 @@ extern "C" { ...@@ -59,25 +59,27 @@ extern "C" {
#define TSDB_FUNC_FIRST_DST 25 #define TSDB_FUNC_FIRST_DST 25
#define TSDB_FUNC_LAST_DST 26 #define TSDB_FUNC_LAST_DST 26
#define TSDB_FUNC_INTERP 27 #define TSDB_FUNC_STDDEV_DST 27
#define TSDB_FUNC_INTERP 28
#define TSDB_FUNC_RATE 28
#define TSDB_FUNC_IRATE 29 #define TSDB_FUNC_RATE 29
#define TSDB_FUNC_SUM_RATE 30 #define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_SUM_IRATE 31 #define TSDB_FUNC_SUM_RATE 31
#define TSDB_FUNC_AVG_RATE 32 #define TSDB_FUNC_SUM_IRATE 32
#define TSDB_FUNC_AVG_IRATE 33 #define TSDB_FUNC_AVG_RATE 33
#define TSDB_FUNC_AVG_IRATE 34
#define TSDB_FUNC_TID_TAG 34
#define TSDB_FUNC_HISTOGRAM 35 #define TSDB_FUNC_TID_TAG 35
#define TSDB_FUNC_HLL 36 #define TSDB_FUNC_HISTOGRAM 36
#define TSDB_FUNC_MODE 37 #define TSDB_FUNC_HLL 37
#define TSDB_FUNC_SAMPLE 38 #define TSDB_FUNC_MODE 38
#define TSDB_FUNC_CEIL 39 #define TSDB_FUNC_SAMPLE 39
#define TSDB_FUNC_FLOOR 40 #define TSDB_FUNC_CEIL 40
#define TSDB_FUNC_ROUND 41 #define TSDB_FUNC_FLOOR 41
#define TSDB_FUNC_MAVG 42 #define TSDB_FUNC_ROUND 42
#define TSDB_FUNC_CSUM 43 #define TSDB_FUNC_MAVG 43
#define TSDB_FUNC_CSUM 44
#define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
...@@ -90,15 +92,12 @@ extern "C" { ...@@ -90,15 +92,12 @@ extern "C" {
#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF #define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF
#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF #define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF
#define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16 #define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16
#define TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE 50 #define TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE 50
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value #define DATA_SET_FLAG ',' // to denote the output area has data, not null value
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG) #define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1 #define QUERY_DESC_FORWARD_STEP -1
...@@ -167,8 +166,9 @@ typedef struct SExtTagsInfo { ...@@ -167,8 +166,9 @@ typedef struct SExtTagsInfo {
// sql function runtime context // sql function runtime context
typedef struct SQLFunctionCtx { typedef struct SQLFunctionCtx {
int32_t startOffset; int32_t startOffset; // todo remove it
int32_t size; // number of rows int32_t size; // number of rows
void * pInput; //
uint32_t order; // asc|desc uint32_t order; // asc|desc
int16_t inputType; int16_t inputType;
int16_t inputBytes; int16_t inputBytes;
...@@ -177,13 +177,12 @@ typedef struct SQLFunctionCtx { ...@@ -177,13 +177,12 @@ typedef struct SQLFunctionCtx {
int16_t outputBytes; // size of results, determined by function and input column data type int16_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size int32_t interBufBytes; // internal buffer size
bool hasNull; // null value exist in current block bool hasNull; // null value exist in current block
bool requireNull; // require null in some function bool requireNull; // require null in some function
bool stableQuery; bool stableQuery;
int16_t functionId; // function id int16_t functionId; // function id
void * aInputElemBuf; char * pOutput; // final result output buffer, point to sdata->data
char * aOutputBuf; // final result output buffer, point to sdata->data uint8_t currentStage; // record current running step, default: 0
uint8_t currentStage; // record current running step, default: 0 int64_t startTs; // timestamp range of current query when function is executed on a specific data block
int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block
int32_t numOfParams; int32_t numOfParams;
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */ tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list
...@@ -198,17 +197,16 @@ typedef struct SQLFunctionCtx { ...@@ -198,17 +197,16 @@ typedef struct SQLFunctionCtx {
SPoint1 end; SPoint1 end;
} SQLFunctionCtx; } SQLFunctionCtx;
typedef struct SQLAggFuncElem { typedef struct SAggFunctionInfo {
char aName[TSDB_FUNCTIONS_NAME_MAX_LENGTH]; char name[TSDB_FUNCTIONS_NAME_MAX_LENGTH];
uint8_t index; // index of function in aAggs
uint8_t nAggIdx; // index of function in aAggs
int8_t stableFuncId; // transfer function for super table query int8_t stableFuncId; // transfer function for super table query
uint16_t nStatus; uint16_t status;
bool (*init)(SQLFunctionCtx *pCtx); // setup the execute environment bool (*init)(SQLFunctionCtx *pCtx); // setup the execute environment
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
// some sql function require scan data twice or more, e.g.,stddev, percentile // some sql function require scan data twice or more, e.g.,stddev, percentile
void (*xNextStep)(SQLFunctionCtx *pCtx); void (*xNextStep)(SQLFunctionCtx *pCtx);
...@@ -218,7 +216,7 @@ typedef struct SQLAggFuncElem { ...@@ -218,7 +216,7 @@ typedef struct SQLAggFuncElem {
void (*mergeFunc)(SQLFunctionCtx *pCtx); void (*mergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId); int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
} SQLAggFuncElem; } SAggFunctionInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
...@@ -246,7 +244,7 @@ typedef struct STwaInfo { ...@@ -246,7 +244,7 @@ typedef struct STwaInfo {
} STwaInfo; } STwaInfo;
/* global sql function array */ /* global sql function array */
extern struct SQLAggFuncElem aAggs[]; extern struct SAggFunctionInfo aAggs[];
extern int32_t functionCompatList[]; // compatible check array list extern int32_t functionCompatList[]; // compatible check array list
......
...@@ -143,6 +143,11 @@ typedef struct { ...@@ -143,6 +143,11 @@ typedef struct {
int64_t ts; int64_t ts;
} SOrderedPrjQueryInfo; } SOrderedPrjQueryInfo;
typedef struct {
char* tags;
SArray* pResult; // SArray<SStddevInterResult>
} SInterResult;
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
...@@ -152,9 +157,14 @@ typedef struct SQuery { ...@@ -152,9 +157,14 @@ typedef struct SQuery {
int16_t precision; int16_t precision;
int16_t numOfOutput; int16_t numOfOutput;
int16_t fillType; int16_t fillType;
int16_t checkResultBuf; // check if the buffer is full during scan each block int16_t checkResultBuf; // check if the buffer is full during scan each block
SLimitVal limit; SLimitVal limit;
int32_t rowSize;
int32_t srcRowSize; // todo extract struct
int32_t resultRowSize;
int32_t maxSrcColumnSize;
int32_t tagLen; // tag value length of current query
SSqlGroupbyExpr* pGroupbyExpr; SSqlGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1; SExprInfo* pExpr1;
SExprInfo* pExpr2; SExprInfo* pExpr2;
...@@ -184,14 +194,13 @@ typedef struct SQueryRuntimeEnv { ...@@ -184,14 +194,13 @@ typedef struct SQueryRuntimeEnv {
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; SFillInfo* pFillInfo;
SResultRowInfo windowResInfo; SResultRowInfo windowResInfo;
STSBuf* pTsBuf;
STSCursor cur;
SQueryCostInfo summary; SQueryCostInfo summary;
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group bool queryWindowIdentical; // all query time windows are identical for all tables in one group
...@@ -205,8 +214,12 @@ typedef struct SQueryRuntimeEnv { ...@@ -205,8 +214,12 @@ typedef struct SQueryRuntimeEnv {
int32_t* rowCellInfoOffset;// offset value for each row result cell info int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow; char** prevRow;
char** nextRow;
SArray* prevResult; // intermediate result, SArray<SInterResult>
STSBuf* pTsBuf; // timestamp filter list
STSCursor cur;
char* tagVal; // tag value of current data block
SArithmeticSupport *sasArray; SArithmeticSupport *sasArray;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
......
...@@ -68,7 +68,7 @@ typedef struct SPoint { ...@@ -68,7 +68,7 @@ typedef struct SPoint {
void * val; void * val;
} SPoint; } SPoint;
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol, void* handle); SFillColInfo* pFillCol, void* handle);
...@@ -78,7 +78,7 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo); ...@@ -78,7 +78,7 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput); void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput);
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput); void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput);
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#ifndef TDENGINE_QUERYUTIL_H #ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H
#include "tbuffer.h"
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \ do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \ assert(sizeof(_uid) == sizeof(uint64_t)); \
...@@ -74,5 +76,13 @@ int32_t getNumOfUsedResultRows(SResultRowPool* p); ...@@ -74,5 +76,13 @@ int32_t getNumOfUsedResultRows(SResultRowPool* p);
bool isPointInterpoQuery(SQuery *pQuery); bool isPointInterpoQuery(SQuery *pQuery);
typedef struct {
SArray* pResult; // SArray<SResPair>
int32_t colId;
} SStddevInterResult;
void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
SArray* interResFromBinary(const char* data, int32_t len);
void freeInterResult(void* param);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
此差异已折叠。
此差异已折叠。
...@@ -321,7 +321,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { ...@@ -321,7 +321,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
return pFillInfo->numOfRows - pFillInfo->index; return pFillInfo->numOfRows - pFillInfo->index;
} }
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pCol, void* handle) { SFillColInfo* pCol, void* handle) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
...@@ -414,7 +414,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) ...@@ -414,7 +414,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
} }
// copy the data into source data buffer // copy the data into source data buffer
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput) { void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes); memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
} }
......
此差异已折叠。
...@@ -2623,7 +2623,7 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa ...@@ -2623,7 +2623,7 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa
f1 = (char*) TABLE_NAME(pTable1); f1 = (char*) TABLE_NAME(pTable1);
f2 = (char*) TABLE_NAME(pTable2); f2 = (char*) TABLE_NAME(pTable2);
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
bytes = tGetTableNameColumnSchema().bytes; bytes = tGetTbnameColumnSchema()->bytes;
} else { } else {
STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex); STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
bytes = pCol->bytes; bytes = pCol->bytes;
......
...@@ -157,7 +157,7 @@ void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*)); ...@@ -157,7 +157,7 @@ void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*));
* @param pArray * @param pArray
* @param compar * @param compar
*/ */
void taosArraySort(SArray* pArray, int (*compar)(const void*, const void*)); void taosArraySort(SArray* pArray, __compar_fn_t comparFn);
/** /**
* sort string array * sort string array
......
...@@ -220,7 +220,7 @@ void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*)) { ...@@ -220,7 +220,7 @@ void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*)) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
void taosArraySort(SArray* pArray, int (*compar)(const void*, const void*)) { void taosArraySort(SArray* pArray, __compar_fn_t compar) {
assert(pArray != NULL); assert(pArray != NULL);
assert(compar != NULL); assert(compar != NULL);
......
...@@ -191,7 +191,8 @@ double tbufReadDouble(SBufferReader* buf) { ...@@ -191,7 +191,8 @@ double tbufReadDouble(SBufferReader* buf) {
// writer functions // writer functions
void tbufCloseWriter( SBufferWriter* buf ) { void tbufCloseWriter( SBufferWriter* buf ) {
(*buf->allocator)( buf->data, 0 ); tfree(buf->data);
// (*buf->allocator)( buf->data, 0 ); // potential memory leak.
buf->data = NULL; buf->data = NULL;
buf->pos = 0; buf->pos = 0;
buf->size = 0; buf->size = 0;
......
此差异已折叠。
...@@ -44,12 +44,14 @@ class TDTestCase: ...@@ -44,12 +44,14 @@ class TDTestCase:
# stddev verifacation # stddev verifacation
tdSql.error("select stddev(ts) from test1") tdSql.error("select stddev(ts) from test1")
tdSql.error("select stddev(col1) from test")
tdSql.error("select stddev(col2) from test") # stddev support super table now
tdSql.error("select stddev(col3) from test") # tdSql.error("select stddev(col1) from test")
tdSql.error("select stddev(col4) from test") # tdSql.error("select stddev(col2) from test")
tdSql.error("select stddev(col5) from test") # tdSql.error("select stddev(col3) from test")
tdSql.error("select stddev(col6) from test") # tdSql.error("select stddev(col4) from test")
# tdSql.error("select stddev(col5) from test")
# tdSql.error("select stddev(col6) from test")
tdSql.error("select stddev(col7) from test1") tdSql.error("select stddev(col7) from test1")
tdSql.error("select stddev(col8) from test1") tdSql.error("select stddev(col8) from test1")
tdSql.error("select stddev(col9) from test1") tdSql.error("select stddev(col9) from test1")
......
...@@ -39,12 +39,6 @@ class TDTestCase: ...@@ -39,12 +39,6 @@ class TDTestCase:
# stddev verifacation # stddev verifacation
tdSql.error("select stddev(ts) from test1") tdSql.error("select stddev(ts) from test1")
tdSql.error("select stddev(col1) from test")
tdSql.error("select stddev(col2) from test")
tdSql.error("select stddev(col3) from test")
tdSql.error("select stddev(col4) from test")
tdSql.error("select stddev(col5) from test")
tdSql.error("select stddev(col6) from test")
tdSql.error("select stddev(col7) from test1") tdSql.error("select stddev(col7) from test1")
tdSql.error("select stddev(col8) from test1") tdSql.error("select stddev(col8) from test1")
tdSql.error("select stddev(col9) from test1") tdSql.error("select stddev(col9) from test1")
......
...@@ -111,6 +111,18 @@ class TDTestCase: ...@@ -111,6 +111,18 @@ class TDTestCase:
tdSql.query("select * from tb where c5 = 'true' ") tdSql.query("select * from tb where c5 = 'true' ")
tdSql.checkRows(5) tdSql.checkRows(5)
# For jira: https://jira.taosdata.com:18080/browse/TD-2850
tdSql.execute("create database 'Test' ")
tdSql.execute("use 'Test' ")
tdSql.execute("create table 'TB'(ts timestamp, 'Col1' int) tags('Tag1' int)")
tdSql.execute("insert into 'Tb0' using tb tags(1) values(now, 1)")
tdSql.query("select * from tb")
tdSql.checkRows(1)
tdSql.query("select * from tb0")
tdSql.checkRows(1)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册