未验证 提交 9e3d2648 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1964 from taosdata/feature/query

......@@ -21,7 +21,7 @@ extern "C" {
#include "qextbuffer.h"
#include "qinterpolation.h"
#include "qfill.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "tsclient.h"
......@@ -60,7 +60,7 @@ typedef struct SLocalReducer {
char * prevRowOfInput;
tFilePage * pResultBuf;
int32_t nResultBufSize;
char * pBufForInterpo; // intermediate buffer for interpolation
// char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
......@@ -68,9 +68,9 @@ typedef struct SLocalReducer {
bool hasPrevRow; // cannot be released
bool hasUnprocessedRow;
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SInterpolationInfo interpolationInfo; // interpolation support structure
SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo
tFilePage * discardData;
SResultInfo * pResInfo;
......@@ -30,10 +30,10 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
#include "qExecutor.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "queryExecutor.h"
// forward declaration
struct SSqlInfo;
......@@ -210,7 +210,7 @@ typedef struct SQueryInfo {
SLimitVal slimit;
STagCond tagCond;
SOrderVal order;
int16_t interpoType; // interpolate type
int16_t fillType; // interpolate type
int16_t numOfTables;
STableMetaInfo **pTableMetaInfo;
struct STSBuf * tsBuf;
......@@ -263,7 +263,7 @@ typedef struct SResRec {
typedef struct {
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
int64_t numOfClauseTotal; // num of total result in current subclause
char * pRsp;
int32_t rspType;
int32_t rspLen;
......@@ -147,7 +147,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
// local merge has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
pRes->numOfClauseTotal += pRes->numOfRows;
(*pSql->fetchFp)(param, tres, numOfRows);
......@@ -16,8 +16,8 @@
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "qfill.h"
#include "qhistogram.h"
#include "qinterpolation.h"
#include "qpercentile.h"
#include "qsyntaxtreefunction.h"
#include "qtsbuf.h"
......@@ -3418,6 +3418,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
int32_t numOfElems = pCtx->size;
// todo : opt with pre-calculated result
// column missing cause the hasNull to be true
if (usePreVal(pCtx)) {
numOfElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
......@@ -3446,13 +3447,13 @@ static void spread_function(SQLFunctionCtx *pCtx) {
} else {
if (pInfo->min > pCtx->param[1].dKey) {
pInfo->min = pCtx->param[1].dKey;
if (pInfo->max < pCtx->param[2].dKey) {
pInfo->max = pCtx->param[2].dKey;
// if (pInfo->min > pCtx->param[1].dKey) {
// pInfo->min = pCtx->param[1].dKey;
// }
// if (pInfo->max < pCtx->param[2].dKey) {
// pInfo->max = pCtx->param[2].dKey;
// }
void *pData = GET_INPUT_CHAR(pCtx);
......@@ -3866,16 +3867,16 @@ static void interp_function(SQLFunctionCtx *pCtx) {
SInterpInfoDetail *pInfoDetail = interpInfo.pInterpDetail;
/* set no output result */
if (pInfoDetail->type == TSDB_INTERPO_NONE) {
if (pInfoDetail->type == TSDB_FILL_NONE) {
pCtx->param[3].i64Key = 0;
} else if (pInfoDetail->primaryCol == 1) {
*(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts;
} else {
if (pInfoDetail->type == TSDB_INTERPO_NULL) {
if (pInfoDetail->type == TSDB_FILL_NULL) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
} else if (pInfoDetail->type == TSDB_INTERPO_SET_VALUE) {
} else if (pInfoDetail->type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType);
} else if (pInfoDetail->type == TSDB_INTERPO_PREV) {
} else if (pInfoDetail->type == TSDB_FILL_PREV) {
char *data = pCtx->param[1].pz;
char *pVal = data + TSDB_KEYSIZE;
......@@ -3886,7 +3887,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
assignVal(pCtx->aOutputBuf, pVal, pCtx->outputBytes, pCtx->outputType);
} else if (pInfoDetail->type == TSDB_INTERPO_LINEAR) {
} else if (pInfoDetail->type == TSDB_FILL_LINEAR) {
char *data1 = pCtx->param[1].pz;
char *data2 = pCtx->param[2].pz;
......@@ -447,7 +447,7 @@ static int insertStmtExecute(STscStmt* stmt) {
SSqlRes *pRes = &pSql->res;
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
pRes->numOfTotalInCurrentClause = 0;
pRes->numOfClauseTotal = 0;
pRes->qhandle = 0;
......@@ -4020,19 +4020,19 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
if (strncasecmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_NONE;
pQueryInfo->fillType = TSDB_FILL_NONE;
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_NULL;
pQueryInfo->fillType = TSDB_FILL_NULL;
for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes);
} else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_PREV;
pQueryInfo->fillType = TSDB_FILL_PREV;
} else if (strncasecmp(pItem->pVar.pz, "linear", 6) == 0 && pItem->pVar.nLen == 6) {
pQueryInfo->interpoType = TSDB_INTERPO_LINEAR;
pQueryInfo->fillType = TSDB_FILL_LINEAR;
} else if (strncasecmp(pItem->pVar.pz, "value", 5) == 0 && pItem->pVar.nLen == 5) {
pQueryInfo->interpoType = TSDB_INTERPO_SET_VALUE;
pQueryInfo->fillType = TSDB_FILL_SET_VALUE;
if (pFillToken->nExpr == 1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -5562,7 +5562,15 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
ret = tVariantDump(&(pList->a[i].pVar), varDataVal(tagVal), pTagSchema[i].type);
varDataSetLen(tagVal, pList->a[i].pVar.nLen);
if (pList->a[i].pVar.nType == TSDB_DATA_TYPE_NULL) {
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY) {
varDataSetLen(tagVal, sizeof(uint8_t));
} else {
varDataSetLen(tagVal, sizeof(uint32_t));
} else { // todo refactor
varDataSetLen(tagVal, pList->a[i].pVar.nLen);
} else {
ret = tVariantDump(&(pList->a[i].pVar), tagVal, pTagSchema[i].type);
......@@ -652,7 +652,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
pQueryMsg->fillType = htons(pQueryInfo->fillType);
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList));
......@@ -780,7 +780,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
*((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
pMsg += sizeof(pQueryInfo->defaultVal[0]);
......@@ -228,7 +228,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->numOfTotalInCurrentClause = 0;
pRes->numOfClauseTotal = 0;
pCmd->curSql = NULL;
if (NULL != pCmd->pTableList) {
......@@ -407,7 +407,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
// secondary merge has handle this situation
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
pRes->numOfClauseTotal += pRes->numOfRows;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
......@@ -490,8 +490,8 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pSql->cmd.command = pQueryInfo->command;
pRes->numOfTotal += pRes->numOfTotalInCurrentClause;
pRes->numOfTotalInCurrentClause = 0;
pRes->numOfTotal += pRes->numOfClauseTotal;
pRes->numOfClauseTotal = 0;
pRes->rspType = 0;
pSql->numOfSubs = 0;
......@@ -790,7 +790,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->numOfTotalInCurrentClause = 0;
pRes->numOfClauseTotal = 0;
tscTrace("%p Valid SQL: %s pObj:%p", pSql, sql, pObj);
......@@ -921,7 +921,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlRes *pRes = &pSql->res;
pRes->numOfTotal = 0; // the number of getting table meta from server
pRes->numOfTotalInCurrentClause = 0;
pRes->numOfClauseTotal = 0;
pRes->code = 0;
......@@ -208,7 +208,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pStream->numOfRes == 0) {
if (pQueryInfo->interpoType == TSDB_INTERPO_SET_VALUE || pQueryInfo->interpoType == TSDB_INTERPO_NULL) {
if (pQueryInfo->fillType == TSDB_FILL_SET_VALUE || pQueryInfo->fillType == TSDB_FILL_NULL) {
SSqlRes *pRes = &pSql->res;
/* failed to retrieve any result in this retrieve */
......@@ -771,7 +771,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
pRes1->numOfTotalInCurrentClause += pRes1->numOfRows;
pRes1->numOfClauseTotal += pRes1->numOfRows;
// data has retrieved to client, build the join results
......@@ -1390,7 +1390,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
// clear local saved number of results
trsupport->localBuffer->numOfElems = 0;
trsupport->localBuffer->num = 0;
tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql,
......@@ -1457,7 +1457,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
// data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num;
tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx);
......@@ -1465,11 +1465,11 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
#ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
trsupport->localBuffer->numOfElems, colInfo);
tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
trsupport->localBuffer->num, colInfo);
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
......@@ -1834,7 +1834,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
pRes->tsrow[i] = pRes1->tsrow[pIndex->columnIndex];
} else { // continue retrieve data from vnode
if (!tscHashRemainDataInSubqueryResultSet(pSql)) {
......@@ -1879,9 +1879,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (pRes->tsrow[columnIndex] != NULL && isNull(pRes->tsrow[columnIndex], pField->type)) {
pRes->tsrow[columnIndex] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
......@@ -1893,7 +1891,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes - VARSTR_HEADER_SIZE, pRes->buffer[columnIndex])) {
pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow[columnIndex]);
pRes->tsrow[columnIndex] = NULL;
......@@ -280,7 +280,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->interpoType = TSDB_INTERPO_NONE;
pQueryInfo->fillType = TSDB_FILL_NONE;
......@@ -1779,7 +1779,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
......@@ -1989,7 +1989,7 @@ int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* s
bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0);
return (pQueryInfo->clauseLimit > 0 && pRes->numOfTotalInCurrentClause >= pQueryInfo->clauseLimit);
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
......@@ -2037,7 +2037,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
while (++pTableMetaInfo->vgroupIndex < totalVgroups) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfTotalInCurrentClause);
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfClauseTotal);
* update the limit and offset value for the query on the next vnode,
......@@ -2045,11 +2045,11 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* if the pRes->offset is larger than 0, the start returned position has not reached yet.
* Therefore, the pRes->numOfRows, as well as pRes->numOfTotalInCurrentClause, must be 0.
* Therefore, the pRes->numOfRows, as well as pRes->numOfClauseTotal, must be 0.
* The pRes->offset value will be updated by virtual node, during query execution.
if (pQueryInfo->clauseLimit >= 0) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotalInCurrentClause;
pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfClauseTotal;
pQueryInfo->limit.offset = pRes->offset;
......@@ -2092,7 +2092,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
pSql->cmd.command = pQueryInfo->command;
//backup the total number of result first
int64_t num = pRes->numOfTotal + pRes->numOfTotalInCurrentClause;
int64_t num = pRes->numOfTotal + pRes->numOfClauseTotal;
pRes->numOfTotal = num;
......@@ -2126,16 +2126,26 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE);
if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL;
} else {
pRes->tsrow[columnIndex] = pData + VARSTR_HEADER_SIZE;
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
*(char*) (pData + realLen + VARSTR_HEADER_SIZE) = 0;
pRes->tsrow[columnIndex] = pData + VARSTR_HEADER_SIZE;
pRes->length[columnIndex] = realLen;
} else {
assert(bytes == tDataTypeDesc[type].nSize);
pRes->tsrow[columnIndex] = pData;
if (isNull(pData, type)) {
pRes->tsrow[columnIndex] = NULL;
} else {
pRes->tsrow[columnIndex] = pData;
pRes->length[columnIndex] = bytes;
......@@ -402,7 +402,7 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
*(uint64_t *)(val + i * tDataTypeDesc[type].nSize) = TSDB_DATA_DOUBLE_NULL;
case TSDB_DATA_TYPE_NCHAR: // todo : without length?
for (int32_t i = 0; i < numOfElems; ++i) {
*(uint32_t *)(val + i * bytes) = TSDB_DATA_NCHAR_NULL;
......@@ -32,7 +32,7 @@ extern "C" {
#define TSKEY int64_t
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
......@@ -140,19 +140,19 @@ enum _mgmt_table {
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
#define TSDB_FILL_PREV 4
......@@ -164,8 +164,8 @@ enum _mgmt_table {
#define TSDB_COL_NORMAL 0x0u
#define TSDB_COL_TAG 0x1u
#define TSDB_COL_JOIN 0x2u
#define TSDB_COL_TAG 0x1u
#define TSDB_COL_JOIN 0x2u
extern char *taosMsg[];
......@@ -440,7 +440,7 @@ typedef struct {
uint16_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers
int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t interpoType; // interpolate type
int16_t fillType; // interpolate type
uint64_t defaultVal; // default value array list
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
......@@ -167,12 +167,6 @@ typedef struct {
SArray *pGroupList;
} STableGroupInfo;
typedef struct {
} SFields;
typedef struct SQueryRowCond {
int32_t rel;
......@@ -18,20 +18,20 @@
#include "os.h"
#include "hash.h"
#include "tsdb.h"
#include "qinterpolation.h"
#include "qfill.h"
#include "qresultBuf.h"
#include "qsqlparser.h"
#include "qtsbuf.h"
#include "taosdef.h"
#include "tarray.h"
#include "tref.h"
#include "tsdb.h"
#include "tsqlfunction.h"
#include "tarray.h"
typedef struct SData {
int32_t num;
char data[];
} SData;
//typedef struct tFilePage {
// int64_t num;
// char data[];
//} tFilePage;
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
......@@ -129,7 +129,7 @@ typedef struct SQuery {
char slidingTimeUnit; // interval data type, used for daytime revise
int8_t precision;
int16_t numOfOutput;
int16_t interpoType;
int16_t fillType;
int16_t checkBuffer; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t rowSize;
......@@ -139,11 +139,10 @@ typedef struct SQuery {
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* defaultVal;
// TSKEY lastKey;
uint32_t status; // query status
SResultRec rec;
int32_t pos;
SData** sdata;
tFilePage** sdata;
STableQueryInfo* current;
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
......@@ -151,12 +150,11 @@ typedef struct SQuery {
typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
SData** pInterpoBuf;
SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SFillInfo* pFillInfo;
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
......@@ -68,7 +68,7 @@ typedef struct SExtFileInfo {
} SExtFileInfo;
typedef struct tFilePage {
uint64_t numOfElems;
uint64_t num;
char data[];
} tFilePage;
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#ifdef __cplusplus
extern "C" {
#include "os.h"
#include "taosdef.h"
#include "qextbuffer.h"
typedef struct {
STColumn col; // column info
int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
union {int64_t i; double d;} defaultVal;
} SFillColInfo;
typedef struct SFillInfo {
TSKEY start; // start timestamp
TSKEY endKey; // endKey for fill
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
int32_t fillType; // fill type
int32_t numOfRows; // number of rows in the input data block
int32_t rowIdx; // rowIdx
int32_t numOfTotal; // number of filled rows in one round
int32_t numOfCurrent; // number of filled rows in current results
int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
char ** pTags; // tags value for current interpolation
int64_t slidingTime; // sliding value to determine the number of result for a given time window
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data
SFillColInfo* pFillCol; // column info for fill operations
char** pData; // original result data block involved in filling data
} SFillInfo;
typedef struct SPoint {
int64_t key;
void * val;
} SPoint;
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision);
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity,
int32_t numOfCols, int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol);
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosDestoryFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput);
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput);
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision);
int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows);
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData);
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity);
#ifdef __cplusplus
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#ifdef __cplusplus
extern "C" {
#include "os.h"
#include "taosdef.h"
#include "qextbuffer.h"
typedef struct SInterpolationInfo {
int64_t startTimestamp;
int32_t order; // order [asc/desc]
int32_t numOfRawDataInRows; // number of points in pQuery->sdata
int32_t rowIdx; // rowIdx in pQuery->sdata
int32_t numOfTotalInterpo; // number of interpolated rows in one round
int32_t numOfCurrentInterpo; // number of interpolated rows in current results
char * prevValues; // previous row of data
char * nextValues; // next row of data
int32_t numOfTags;
char ** pTags; // tags value for current interoplation
} SInterpolationInfo;
typedef struct SPoint {
int64_t key;
void * val;
} SPoint;
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision);
void taosInitInterpoInfo(SInterpolationInfo *pInterpoInfo, int32_t order, int64_t startTimeStamp, int32_t numOfTags,
int32_t rowSize);
void taosDestoryInterpoInfo(SInterpolationInfo *pInterpoInfo);
void taosInterpoSetStartInfo(SInterpolationInfo *pInterpoInfo, int32_t numOfRawDataInRows, int32_t type);
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision);
* @param pInterpoInfo
* @param pPrimaryKeyArray
* @param numOfRows
* @param nInterval
* @param ekey
* @param maxNumOfRows
* @return
int32_t taosGetNumOfResultWithInterpo(SInterpolationInfo *pInterpoInfo, int64_t *pPrimaryKeyArray, int32_t numOfRows,
int64_t nInterval, int64_t ekey, int32_t maxNumOfRows);
int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo *pInterpoInfo, int64_t *pPrimaryKeyArray,
int32_t numOfRawDataInRows, int64_t nInterval, int64_t ekey);
* @param pInterpoInfo
* @return
bool taosHasRemainsDataForInterpolation(SInterpolationInfo *pInterpoInfo);
int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo);
int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoType, tFilePage **data,
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
const int64_t *pPrimaryKeyArray, SColumnModel *pModel, char **srcData, int64_t *defaultVal,
const int32_t *functionIDs, int32_t bufSize);
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
#ifdef __cplusplus
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#ifdef __cplusplus
extern "C" {
......@@ -24,22 +24,23 @@ extern "C" {
extern int32_t qDebugFlag;
#define qTrace(...) \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("DND QRY ", qDebugFlag, __VA_ARGS__); \
#define qTrace(...) \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", qDebugFlag, __VA_ARGS__); \
#define qError(...) \
if (qDebugFlag & DEBUG_ERROR) { \
#define qError(...) \
if (qDebugFlag & DEBUG_ERROR) { \
taosPrintLog("ERROR QRY ", qDebugFlag, __VA_ARGS__); \
#define qWarn(...) \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("WARN QRY ", qDebugFlag, __VA_ARGS__); \
#define qWarn(...) \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("WARN QRY ", qDebugFlag, __VA_ARGS__); \
#ifdef __cplusplus
......@@ -32,7 +32,7 @@ typedef struct SLoserTreeNode {
typedef struct SLoserTreeInfo {
int32_t numOfEntries;
int32_t totalEntries;
__merge_compare_fn_t comparaFn;
__merge_compare_fn_t comparFn;
void * param;
SLoserTreeNode *pNode;
......@@ -16,10 +16,10 @@
#include "os.h"
#include "qExecutor.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
#include "queryExecutor.h"
#include "tcompare.h"
#include "tsqlfunction.h"
bool less_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) {
return (*(int8_t *)minval < pFilter->filterInfo.upperBndi);
......@@ -20,11 +20,11 @@
#include "qextbuffer.h"
#include "ttime.h"
#include "qinterpolation.h"
#include "qfill.h"
#include "ttime.h"
#include "queryExecutor.h"
#include "queryUtil.h"
#include "qExecutor.h"
#include "qUtil.h"
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) {
......@@ -37,7 +37,8 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
pWindowResInfo->curIndex = -1;
pWindowResInfo->size = 0;
pWindowResInfo->size = 0;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
// use the pointer arraylist
pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult));
......@@ -96,8 +97,8 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, false);
pWindowResInfo->startTime = 0;
pWindowResInfo->prevSKey = 0;
pWindowResInfo->startTime = TSKEY_INITIAL_VAL;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
......@@ -476,44 +476,6 @@ typedef struct {
SEndPoint* end;
} SQueryCond;
//static void setInitialValueForRangeQueryCondition(tSKipListQueryCond *q, int8_t type) {
// q->lowerBndRelOptr = TSDB_RELATION_GREATER;
// q->upperBndRelOptr = TSDB_RELATION_LESS;
// switch (type) {
// q->upperBnd.nType = TSDB_DATA_TYPE_BIGINT;
// q->lowerBnd.nType = TSDB_DATA_TYPE_BIGINT;
// q->upperBnd.i64Key = INT64_MAX;
// q->lowerBnd.i64Key = INT64_MIN;
// break;
// };
// q->upperBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q->lowerBnd.nType = TSDB_DATA_TYPE_DOUBLE;
// q->upperBnd.dKey = DBL_MAX;
// q->lowerBnd.dKey = -DBL_MIN;
// break;
// };
// q->upperBnd.nType = type;
// q->upperBnd.pz = NULL;
// q->upperBnd.nLen = -1;
// q->lowerBnd.nType = type;
// q->lowerBnd.pz = NULL;
// q->lowerBnd.nLen = -1;
// }
// }
// todo check for malloc failure
static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
int32_t optr = queryColInfo->optr;
......@@ -788,7 +750,6 @@ static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SExprTravers
taosArrayCopy(pResult, array);
static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList, SExprTraverseSupp *param ) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
......@@ -834,8 +795,6 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
// post-root order traverse syntax tree
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) {
......@@ -1100,7 +1059,6 @@ static char* exception_strdup(const char* str) {
return p;
static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
int32_t anchor = CLEANUP_GET_ANCHOR();
......@@ -136,7 +136,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
item->pNext = NULL;
item->item.numOfElems = 0;
item->item.num = 0;
if (pMemBuffer->pTail != NULL) {
pMemBuffer->pTail->pNext = item;
......@@ -167,13 +167,13 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow
pLast = pMemBuffer->pTail;
if (pLast->item.numOfElems + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records
if (pLast->item.num + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records
tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRows, numOfRows);
pMemBuffer->numOfElemsInBuffer += numOfRows;
pMemBuffer->numOfTotalElems += numOfRows;
} else {
int32_t numOfRemainEntries = pMemBuffer->numOfElemsPerPage - pLast->item.numOfElems;
int32_t numOfRemainEntries = pMemBuffer->numOfElemsPerPage - pLast->item.num;
tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRemainEntries, numOfRows);
pMemBuffer->numOfElemsInBuffer += numOfRemainEntries;
......@@ -271,7 +271,7 @@ bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
ret = false;
pMemBuffer->fileMeta.numOfElemsInFile += first->item.numOfElems;
pMemBuffer->fileMeta.numOfElemsInFile += first->item.num;
pMemBuffer->fileMeta.nFileSize += 1;
tFilePagesItem *ptmp = first;
......@@ -985,16 +985,16 @@ void tColModelDisplayEx(SColumnModel *pModel, void *pData, int32_t numOfRows, in
void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) {
if (inputBuffer->numOfElems == 0 || maxElemsCapacity == inputBuffer->numOfElems) {
if (inputBuffer->num == 0 || maxElemsCapacity == inputBuffer->num) {
/* start from the second column */
for (int32_t i = 1; i < pModel->numOfCols; ++i) {
SSchemaEx* pSchemaEx = &pModel->pFields[i];
memmove(inputBuffer->data + pSchemaEx->offset * inputBuffer->numOfElems,
memmove(inputBuffer->data + pSchemaEx->offset * inputBuffer->num,
inputBuffer->data + pSchemaEx->offset * maxElemsCapacity,
pSchemaEx->field.bytes * inputBuffer->numOfElems);
pSchemaEx->field.bytes * inputBuffer->num);
......@@ -1009,13 +1009,13 @@ int16_t getColumnModelOffset(SColumnModel *pColumnModel, int32_t index) {
void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockCapacity, int32_t s, int32_t e) {
if (inputBuffer->numOfElems == 0 || (e - s + 1) <= 0) {
if (inputBuffer->num == 0 || (e - s + 1) <= 0) {
int32_t removed = e - s + 1;
int32_t remain = inputBuffer->numOfElems - removed;
int32_t secPart = inputBuffer->numOfElems - e - 1;
int32_t remain = inputBuffer->num - removed;
int32_t secPart = inputBuffer->num - e - 1;
/* start from the second column */
for (int32_t i = 0; i < pModel->numOfCols; ++i) {
......@@ -1028,7 +1028,7 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC
memmove(startPos, endPos, pSchema->bytes * secPart);
inputBuffer->numOfElems = remain;
inputBuffer->num = remain;
......@@ -1040,16 +1040,16 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC
void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t start, int32_t numOfRows,
int32_t srcCapacity) {
assert(dstPage->numOfElems + numOfRows <= dstModel->capacity);
assert(dstPage->num + numOfRows <= dstModel->capacity);
for (int32_t col = 0; col < dstModel->numOfCols; ++col) {
char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->numOfElems, col);
char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->num, col);
char *src = COLMODEL_GET_VAL((char *)srcData, dstModel, srcCapacity, start, col);
memmove(dst, src, dstModel->pFields[col].field.bytes * numOfRows);
dstPage->numOfElems += numOfRows;
dstPage->num += numOfRows;
tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrderCols, SColumnModel *pModel,
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "qfill.h"
#include "os.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) {
return startTime;
if (timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h') {
return (startTime / slidingTime) * slidingTime;
} else {
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
* TODO dynamically decide the start time of a day
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
int64_t revStartime = (startTime / slidingTime) * slidingTime + timezone * t;
int64_t revEndtime = revStartime + slidingTime - 1;
if (revEndtime < startTime) {
revStartime += slidingTime;
return revStartime;
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol) {
if (fillType == TSDB_FILL_NONE) {
return NULL;
SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo));
taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order;
pFillInfo->fillType = fillType;
pFillInfo->pFillCol = pFillCol;
pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols;
pFillInfo->slidingTime = slidingTime;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
int32_t rowsize = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes = pFillInfo->pFillCol[i].col.bytes;
pFillInfo->pData[i] = calloc(1, sizeof(tFilePage) + bytes * capacity);
rowsize += bytes;
if (numOfTags > 0) {
pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize);
pFillInfo->rowSize = rowsize;
return pFillInfo;
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
pFillInfo->start = startTimestamp;
pFillInfo->rowIdx = -1;
pFillInfo->numOfRows = 0;
pFillInfo->numOfCurrent = 0;
pFillInfo->numOfTotal = 0;
void taosDestoryFillInfo(SFillInfo* pFillInfo) {
if (pFillInfo == NULL) {
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
if (pFillInfo->fillType == TSDB_FILL_NONE) {
pFillInfo->rowIdx = 0;
pFillInfo->numOfRows = numOfRows;
pFillInfo->endKey = endKey;
void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) {
// copy the data into source data buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) {
assert(pFillInfo->numOfRows == pInput->num);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* s = pInput->data + pCol->col.offset * pInput->num;
memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes);
if (pCol->flag == TSDB_COL_TAG) { // copy the tag value
memcpy(pFillInfo->pTags[i], pFillInfo->pData[i], pCol->col.bytes);
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
if (order == TSDB_ORDER_ASC) {
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsArray, int32_t remain,
int64_t nInterval, int64_t ekey) {
if (remain > 0) { // still fill gap within current data block, not generating data after the result set.
TSKEY lastKey = tsArray[pFillInfo->numOfRows - 1];
int32_t total = (int32_t)(labs(lastKey - pFillInfo->start) / nInterval) + 1;
assert(total >= remain);
return total;
} else { // reach the end of data
if ((ekey < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
(ekey > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
return 0;
} else {
return (int32_t)(labs(ekey - pFillInfo->start) / nInterval) + 1;
int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) {
int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows,
pFillInfo->slidingTime, ekey);
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
if (pFillInfo->rowIdx == -1 || pFillInfo->numOfRows == 0) {
return 0;
return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx)
: pFillInfo->rowIdx + 1;
// todo: refactor
static double linearInterpolationImpl(double v1, double v2, double k1, double k2, double k) {
return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) {
*(int32_t*)point->val = linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
point2->key, point->key);
*(float*)point->val =
linearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key);
*(double*)point->val =
linearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key);
*(int64_t*)point->val = linearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
point2->key, point->key);
*(int16_t*)point->val = linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
point2->key, point->key);
*(int8_t*)point->val =
linearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key);
default: {
// TODO: Deal with interpolation with bool and strings and timestamp
return -1;
return 0;
static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) {
for (int32_t j = 0, i = start; i < pColInfo->numOfCols + pColInfo->numOfTags; ++i, ++j) {
SFillColInfo* pCol = &pColInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type);
static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData,
int64_t ts, char** pTags, bool outOfBound) {
char** prevValues = &pFillInfo->prevValues;
char** nextValues = &pFillInfo->nextValues;
SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num);
*(TSKEY*) val = pFillInfo->start;
int32_t numOfValCols = pFillInfo->numOfCols - pFillInfo->numOfTags;
// set the other values
if (pFillInfo->fillType == TSDB_FILL_PREV) {
char* pInterpolationData = FILL_IS_ASC_FILL(pFillInfo) ? *prevValues : *nextValues;
if (pInterpolationData != NULL) {
for (int32_t i = 1; i < numOfValCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
if (isNull(pInterpolationData + pCol->col.offset, pCol->col.type)) {
setNull(val1, pCol->col.type, pCol->col.bytes);
} else {
assignVal(val1, pInterpolationData + pCol->col.offset, pCol->col.bytes, pCol->col.type);
} else { // no prev value yet, set the value for NULL
for (int32_t i = 1; i < numOfValCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
setNull(val1, pCol->col.type, pCol->col.bytes);
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value
if (*prevValues != NULL && !outOfBound) {
for (int32_t i = 1; i < numOfValCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
int16_t type = pCol->col.type;
int16_t bytes = pCol->col.bytes;
char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
setNull(val1, pCol->col.type, bytes);
point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + pCol->col.offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
point = (SPoint){.key = pFillInfo->start, .val = val1};
taosDoLinearInterpolation(type, &point1, &point2, &point);
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
} else {
for (int32_t i = 1; i < numOfValCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
setNull(val1, pCol->col.type, pCol->col.bytes);
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
} else { /* default value interpolation */
for (int32_t i = 1; i < numOfValCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
assignVal(val1, (char*)&pCol->defaultVal.i, pCol->col.bytes, pCol->col.type);
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
pFillInfo->start += (pFillInfo->slidingTime * step);
(*num) += 1;
static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** nextValues) {
if (*nextValues != NULL) {
*nextValues = calloc(1, pFillInfo->rowSize);
for (int i = 1; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes);
int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) {
int32_t num = 0;
pFillInfo->numOfCurrent = 0;
char** prevValues = &pFillInfo->prevValues;
char** nextValues = &pFillInfo->nextValues;
int32_t numOfTags = pFillInfo->numOfTags;
char** pTags = pFillInfo->pTags;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
if (numOfRows == 0) {
* we need to rebuild whole result set
* NOTE:we need to keep the last saved data, to generated the filled data
while (num < outputRows) {
doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true);
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return outputRows;
} else {
while (1) {
int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->rowIdx];
if ((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) {
/* set the next value for interpolation */
initBeforeAfterDataBuf(pFillInfo, nextValues);
int32_t offset = pFillInfo->rowIdx;
for (int32_t i = 0; i < pFillInfo->numOfCols - numOfTags; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
memcpy(*nextValues + pCol->col.offset, srcData[i] + offset * pCol->col.bytes, pCol->col.bytes);
if (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, false);
/* output buffer is full, abort */
if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) ||
(num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return outputRows;
} else {
assert(pFillInfo->start == ts);
initBeforeAfterDataBuf(pFillInfo, prevValues);
// assign rows to dst buffer
int32_t i = 0;
for (; i < pFillInfo->numOfCols - numOfTags; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx);
if (i == 0 ||
(pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) ||
(pCol->functionId == TSDB_FUNC_COUNT && GET_INT64_VAL(src) != 0)) {
assignVal(val1, src, pCol->col.bytes, pCol->col.type);
memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes);
} else { // i > 0 and data is null , do interpolation
if (pFillInfo->fillType == TSDB_FILL_PREV) {
assignVal(val1, *prevValues + pCol->col.offset, pCol->col.bytes, pCol->col.type);
} else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
assignVal(val1, src, pCol->col.bytes, pCol->col.type);
memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes);
} else {
assignVal(val1, (char*) &pCol->defaultVal.i, pCol->col.bytes, pCol->col.type);
// set the tag value for final result
setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num);
pFillInfo->start += (pFillInfo->slidingTime * step);
pFillInfo->rowIdx += 1;
num += 1;
if ((pFillInfo->rowIdx >= pFillInfo->numOfRows && FILL_IS_ASC_FILL(pFillInfo)) ||
(pFillInfo->rowIdx < 0 && !FILL_IS_ASC_FILL(pFillInfo)) || num >= outputRows) {
if (pFillInfo->rowIdx >= pFillInfo->numOfRows || pFillInfo->rowIdx < 0) {
pFillInfo->rowIdx = -1;
pFillInfo->numOfRows = 0;
/* the raw data block is exhausted, next value does not exists */
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return num;
void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
// TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
// pQuery->slidingTimeUnit, pQuery->precision);
// if (QUERY_IS_ASC_QUERY(pQuery)) {
// assert(ekey >= pQuery->window.ekey);
// } else {
// assert(ekey <= pQuery->window.ekey);
// }
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
int32_t numOfRes = taosDoInterpoResult(pFillInfo, output, remain, rows, pFillInfo->pData);
*outputRows = rows;
assert(numOfRes == rows);
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "qinterpolation.h"
#include "os.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSDB_ORDER_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) {
if (timeRange == 0) {
return startTime;
if (intervalTimeUnit == 'a' || intervalTimeUnit == 'm' || intervalTimeUnit == 's' || intervalTimeUnit == 'h') {
return (startTime / timeRange) * timeRange;
} else {
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
* TODO dynamically decide the start time of a day
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
int64_t revStartime = (startTime / timeRange) * timeRange + timezone * t;
int64_t revEndtime = revStartime + timeRange - 1;
if (revEndtime < startTime) {
revStartime += timeRange;
return revStartime;
void taosInitInterpoInfo(SInterpolationInfo* pInterpoInfo, int32_t order, int64_t startTimestamp,
int32_t numOfGroupbyTags, int32_t rowSize) {
pInterpoInfo->startTimestamp = startTimestamp;
pInterpoInfo->rowIdx = -1;
pInterpoInfo->numOfRawDataInRows = 0;
pInterpoInfo->numOfCurrentInterpo = 0;
pInterpoInfo->numOfTotalInterpo = 0;
pInterpoInfo->order = order;
pInterpoInfo->numOfTags = numOfGroupbyTags;
if (pInterpoInfo->pTags == NULL && numOfGroupbyTags > 0) {
pInterpoInfo->pTags = calloc(1, numOfGroupbyTags * POINTER_BYTES + rowSize);
// set the previous value to be null
// the SInterpolationInfo itself will not be released
void taosDestoryInterpoInfo(SInterpolationInfo* pInterpoInfo) {
if (pInterpoInfo == NULL) {
void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawDataInRows, int32_t type) {
if (type == TSDB_INTERPO_NONE) {
pInterpoInfo->rowIdx = 0;
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) {
if (order == TSDB_ORDER_ASC) {
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision);
int32_t taosGetNumOfResultWithInterpo(SInterpolationInfo* pInterpoInfo, TSKEY* pPrimaryKeyArray,
int32_t numOfRawDataInRows, int64_t nInterval, int64_t ekey,
int32_t maxNumOfRows) {
int32_t numOfRes = taosGetNumOfResWithoutLimit(pInterpoInfo, pPrimaryKeyArray, numOfRawDataInRows, nInterval, ekey);
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* pPrimaryKeyArray,
int32_t numOfAvailRawData, int64_t nInterval, int64_t ekey) {
if (numOfAvailRawData > 0) {
int32_t finalNumOfResult = 0;
// get last timestamp, calculate the result size
int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1];
finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1;
assert(finalNumOfResult >= numOfAvailRawData);
return finalNumOfResult;
} else {
/* reach the end of data */
if ((ekey < pInterpoInfo->startTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(ekey > pInterpoInfo->startTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) {
return 0;
} else {
return (int32_t)(labs(ekey - pInterpoInfo->startTimestamp) / nInterval) + 1;
bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) {
return taosNumOfRemainPoints(pInterpoInfo) > 0;
int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) {
if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) {
return 0;
return INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? (pInterpoInfo->numOfRawDataInRows - pInterpoInfo->rowIdx)
: pInterpoInfo->rowIdx + 1;
static double doLinearInterpolationImpl(double v1, double v2, double k1, double k2, double k) {
return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) {
*(int32_t*)point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
point2->key, point->key);
*(float*)point->val =
doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key);
*(double*)point->val =
doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key);
*(int64_t*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
point2->key, point->key);
*(int16_t*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
point2->key, point->key);
*(int8_t*)point->val =
doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key);
default: {
// TODO: Deal with interpolation with bool and strings and timestamp
return -1;
return 0;
static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; }
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order,
int32_t start, int32_t capacity, int32_t num) {
for (int32_t j = 0, i = start; i < pModel->numOfCols; ++i, ++j) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, num);
assignVal(val1, pTags[j], pSchema->bytes, pSchema->type);
static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data,
SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval,
int64_t* defaultVal, int64_t currentTimestamp, int32_t capacity, int32_t numOfTags,
char** pTags, bool outOfBound) {
char** prevValues = &pInterpoInfo->prevValues;
char** nextValues = &pInterpoInfo->nextValues;
SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order);
char* val = getPos(data[0]->data, TSDB_KEYSIZE, *num);
*(TSKEY*)val = pInterpoInfo->startTimestamp;
int32_t numOfValCols = pModel->numOfCols - numOfTags;
// set the other values
if (interpoType == TSDB_INTERPO_PREV) {
char* pInterpolationData = INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? *prevValues : *nextValues;
if (pInterpolationData != NULL) {
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
int16_t offset = getColumnModelOffset(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
if (isNull(pInterpolationData + offset, pSchema->type)) {
setNull(val1, pSchema->type, pSchema->bytes);
} else {
assignVal(val1, pInterpolationData + offset, pSchema->bytes, pSchema->type);
} else { /* no prev value yet, set the value for null */
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
setNull(val1, pSchema->type, pSchema->bytes);
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num);
} else if (interpoType == TSDB_INTERPO_LINEAR) {
// TODO : linear interpolation supports NULL value
if (*prevValues != NULL && !outOfBound) {
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
int16_t offset = getColumnModelOffset(pModel, i);
int16_t type = pSchema->type;
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
setNull(val1, type, pSchema->bytes);
point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + offset};
point2 = (SPoint){.key = currentTimestamp, .val = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes};
point = (SPoint){.key = pInterpoInfo->startTimestamp, .val = val1};
taosDoLinearInterpolation(type, &point1, &point2, &point);
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num);
} else {
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
setNull(val1, pSchema->type, pSchema->bytes);
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num);
} else { /* default value interpolation */
for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type);
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num);
pInterpoInfo->startTimestamp += (nInterval * step);
(*num) += 1;
static void initBeforeAfterDataBuf(SColumnModel* pModel, char** nextValues) {
if (*nextValues != NULL) {
*nextValues = calloc(1, pModel->rowSize);
for (int i = 1; i < pModel->numOfCols; i++) {
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
setNull(*nextValues + offset, pSchema->type, pSchema->bytes);
int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data,
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
const int64_t* pPrimaryKeyArray, SColumnModel* pModel, char** srcData, int64_t* defaultVal,
const int32_t* functionIDs, int32_t bufSize) {
int32_t num = 0;
pInterpoInfo->numOfCurrentInterpo = 0;
char** prevValues = &pInterpoInfo->prevValues;
char** nextValues = &pInterpoInfo->nextValues;
int32_t numOfTags = pInterpoInfo->numOfTags;
char** pTags = pInterpoInfo->pTags;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order);
if (numOfRawDataInRows == 0) {
* we need to rebuild whole data
* NOTE:we need to keep the last saved data, to satisfy the interpolation
while (num < outputRows) {
doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal,
pInterpoInfo->startTimestamp, bufSize, numOfTags, pTags, true);
pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo;
return outputRows;
} else {
while (1) {
int64_t currentTimestamp = pPrimaryKeyArray[pInterpoInfo->rowIdx];
if ((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) {
/* set the next value for interpolation */
initBeforeAfterDataBuf(pModel, nextValues);
int32_t offset = pInterpoInfo->rowIdx;
for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i);
memcpy(*nextValues + tlen, srcData[i] + offset * pSchema->bytes, pSchema->bytes);
tlen += pSchema->bytes;
if (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) &&
num < outputRows) {
while (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) &&
num < outputRows) {
doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal,
currentTimestamp, bufSize, numOfTags, pTags, false);
/* output buffer is full, abort */
if ((num == outputRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(num < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) {
pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo;
return outputRows;
} else {
assert(pInterpoInfo->startTimestamp == currentTimestamp);
initBeforeAfterDataBuf(pModel, prevValues);
// assign rows to dst buffer
int32_t i = 0;
for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) {
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, num);
char* src = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes;
if (i == 0 ||
(functionIDs[i] != TSDB_FUNC_COUNT && !isNull(src, pSchema->type)) ||
(functionIDs[i] == TSDB_FUNC_COUNT && *(int64_t*)(src) != 0)) {
assignVal(val1, src, pSchema->bytes, pSchema->type);
memcpy(*prevValues + tlen, src, pSchema->bytes);
} else { // i > 0 and data is null , do interpolation
if (interpoType == TSDB_INTERPO_PREV) {
assignVal(val1, *prevValues + offset, pSchema->bytes, pSchema->type);
} else if (interpoType == TSDB_INTERPO_LINEAR) {
assignVal(val1, src, pSchema->bytes, pSchema->type);
memcpy(*prevValues + tlen, src, pSchema->bytes);
} else {
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type);
tlen += pSchema->bytes;
/* set the tag value for final result */
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, pModel->numOfCols - numOfTags, bufSize,
pInterpoInfo->startTimestamp += (nInterval * step);
pInterpoInfo->rowIdx += 1;
num += 1;
if ((pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(pInterpoInfo->rowIdx < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || num >= outputRows) {
if (pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows || pInterpoInfo->rowIdx < 0) {
pInterpoInfo->rowIdx = -1;
pInterpoInfo->numOfRawDataInRows = 0;
/* the raw data block is exhausted, next value does not exists */
pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo;
return num;
......@@ -64,26 +64,26 @@ static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx,
for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) {
ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
assert(pPage->numOfElems > 0);
assert(pPage->num > 0);
tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, pPage->numOfElems, pPage->numOfElems);
printf("id: %d count: %" PRIu64 "\n", j, buffer->numOfElems);
tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, pPage->num, pPage->num);
printf("id: %d count: %" PRIu64 "\n", j, buffer->num);
assert(buffer->numOfElems == pMemBuffer->fileMeta.numOfElemsInFile);
assert(buffer->num == pMemBuffer->fileMeta.numOfElemsInFile);
// load data in pMemBuffer to buffer
tFilePagesItem *pListItem = pMemBuffer->pHead;
while (pListItem != NULL) {
tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, pListItem->item.numOfElems,
tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, pListItem->item.num,
pListItem = pListItem->pNext;
tColDataQSort(pDesc, buffer->numOfElems, 0, buffer->numOfElems - 1, buffer->data, TSDB_ORDER_ASC);
tColDataQSort(pDesc, buffer->num, 0, buffer->num - 1, buffer->data, TSDB_ORDER_ASC);
pDesc->pColumnModel->capacity = oldCapacity; // restore value
return buffer;
......@@ -881,7 +881,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) {
ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
tMemBucketPut(pMemBucket, pPage->data, pPage->numOfElems);
tMemBucketPut(pMemBucket, pPage->data, pPage->num);
......@@ -54,7 +54,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
(*pTree)->numOfEntries = numOfEntries;
(*pTree)->totalEntries = totalEntries;
(*pTree)->param = param;
(*pTree)->comparaFn = compareFn;
(*pTree)->comparFn = compareFn;
// set initial value for loser tree
......@@ -95,7 +95,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
int32_t ret = pTree->comparaFn(&pTree->pNode[parentId], &kLeaf, pTree->param);
int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param);
if (ret < 0) {
SLoserTreeNode t = pTree->pNode[parentId];
pTree->pNode[parentId] = kLeaf;
......@@ -572,9 +572,20 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
char* key = tdGetRowDataOfCol(pTable->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
bool ret = tSkipListRemove(pSTable->pIndex, key);
SArray* res = tSkipListGet(pSTable->pIndex, key);
size_t size = taosArrayGetSize(res);
assert(size > 0);
for(int32_t i = 0; i < size; ++i) {
SSkipListNode* pNode = taosArrayGetP(res, i);
STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode);
if (pElem->pTable == pTable) { // this is the exact what we need
tSkipListRemoveNode(pSTable->pIndex, pNode);
return 0;
......@@ -26,6 +26,8 @@ extern "C" {
#define TD_GE (TD_EQ | TD_GT)
#define TD_LE (TD_EQ | TD_LT)
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void *param);
......@@ -23,8 +23,6 @@
memcpy((__right), (__buf), (__size));\
} while (0);
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
static void median(void *src, size_t size, size_t s, size_t e, const void *param, __ext_compar_fn_t comparFn, void* buf) {
int32_t mid = ((e - s) >> 1u) + s;
......@@ -573,6 +573,7 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
iter->step += 1;
return (iter->order == TSDB_ORDER_ASC)? (iter->cur != pSkipList->pTail) : (iter->cur != pSkipList->pHead);
......@@ -74,9 +74,8 @@ int main(int argc, char *argv[]) {
printf("success to connect to server\n");
doQuery(taos, "create database if not exists test");
doQuery(taos, "create database if not exists test");
// doQuery(taos, "use test");
// doQuery(taos, "select sum(k)*max(k), sum(k), max(k) from tm99");
doQuery(taos, "use test");
doQuery(taos, "select count(*) from m1 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:1:59' interval(500a) fill(value, 99)");
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
// for(int32_t i = 0; i< 100000; ++i) {
......@@ -45,10 +45,6 @@ sql show stables
if $rows != 0 then
return -1
print data00 = $data00
if $data00 != NULL then
return -1
print case_insensitivity test passed
# case2: illegal_metric_name test
......@@ -44,10 +44,6 @@ sql show tables
if $rows != 0 then
return -1
print data00 = $data00
if $data00 != NULL then
return -1
print case_insensitivity test passed
# case2: illegal_table_name test
......@@ -111,18 +111,7 @@ endi
if $data09 != nchar0 then
return -1
if $data11 != NULL then
return -1
if $data12 != NULL then
return -1
if $data13 != NULL then
return -1
if $data14 != NULL then
return -1
## TBASE-329
sql select * from $tb where c1 < 9 order by ts desc limit 1 offset 1
......@@ -537,7 +526,8 @@ endi
if $data14 != 8.000000000 then
return -1
if $data21 != NULL then
if $data21 != null then
print expect null, actual: $data21
return -1
......@@ -554,7 +544,7 @@ endi
if $data21 != 9 then
return -1
if $data31 != NULL then
if $data31 != null then
return -1
......@@ -574,7 +564,7 @@ endi
if $data31 != 9 then
return -1
if $data41 != NULL then
if $data41 != null then
return -1
sql select sum(c1), sum(c2), sum(c3), sum(c4), sum(c5), sum(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 5 offset 1
......@@ -590,7 +580,7 @@ endi
if $data21 != 9 then
return -1
if $data31 != NULL then
if $data31 != null then
return -1
......@@ -607,7 +597,7 @@ endi
if $data21 != 7.000000000 then
return -1
if $data31 != NULL then
if $data31 != null then
return -1
sql select avg(c1), avg(c2), avg(c3), avg(c4), avg(c5), avg(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1
......@@ -623,7 +613,7 @@ endi
if $data21 != 9.000000000 then
return -1
if $data31 != NULL then
if $data31 != null then
return -1
......@@ -84,43 +84,43 @@ endi
#### case 1: tag NULL, or 'NULL'
sql create table mt2 (ts timestamp, col1 int, col3 float, col5 binary(8), col6 bool, col9 nchar(8)) tags (tag1 binary(8), tag2 nchar(8), tag3 int, tag5 bool)
sql create table st2 using mt2 tags (NULL, 'NULL', 102, 'true')
sql describe st2
if $rows != 10 then
sql select tag1, tag2, tag3, tag5 from st2
if $rows != 1 then
return -1
if $data63 != NULL then
print ==1== expect: NULL, actually: $data63
if $data00 != NULL then
print ==1== expect: NULL, actually: $data00
return -1
if $data73 != NULL then
print ==2== expect: NULL, actually: $data73
if $data01 != NULL then
print ==2== expect: NULL, actually: $data01
return -1
if $data83 != 102 then
print ==3== expect: NULL, actually: $data83
if $data02 != 102 then
print ==3== expect: NULL, actually: $data02
return -1
if $data93 != true then
print ==4== expect: NULL, actually: $data93
if $data03 != 1 then
print ==4== expect: 1, actually: $data03
return -1
sql create table st3 using mt2 tags (NULL, 'ABC', 103, 'FALSE')
sql describe st3
if $rows != 10 then
sql select tag1, tag2, tag3, tag5 from st3
if $rows != 1 then
return -1
if $data63 != NULL then
print ==5== expect: NULL, actually: $data63
if $data00 != NULL then
print ==5== expect: NULL, actually: $data00
return -1
if $data73 != ABC then
if $data01 != ABC then
return -1
if $data83 != 103 then
if $data02 != 103 then
return -1
if $data93 != false then
if $data03 != 0 then
return -1
......@@ -128,39 +128,39 @@ endi
sql_error create table stx using mt2 tags ('NULL', '123aBc', 104, '123')
sql_error create table sty using mt2 tags ('NULL', '123aBc', 104, 'xtz')
sql create table st4 using mt2 tags ('NULL', '123aBc', 104, 'NULL')
sql describe st4
if $rows != 10 then
sql select tag1,tag2,tag3,tag5 from st4
if $rows != 1 then
return -1
if $data63 != NULL then
if $data00 != NULL then
return -1
if $data73 != 123aBc then
if $data01 != 123aBc then
return -1
if $data83 != 104 then
if $data02 != 104 then
return -1
if $data93 != NULL then
print ==6== expect: NULL, actually: $data93
if $data03 != NULL then
print ==6== expect: NULL, actually: $data03
return -1
sql create table st5 using mt2 tags ('NULL', '123aBc', 105, NULL)
sql describe st5
if $rows != 10 then
sql select tag1,tag2,tag3,tag5 from st5
if $rows != 1 then
return -1
if $data63 != NULL then
if $data00 != NULL then
return -1
if $data73 != 123aBc then
if $data01 != 123aBc then
return -1
if $data83 != 105 then
if $data02 != 105 then
return -1
if $data93 != NULL then
if $data03 != NULL then
return -1
......@@ -177,28 +177,29 @@ sql_error insert into st34 using mt3 tags ('NULL', '123aBc', 105, NULL) values
#### case 3: set tag value
sql create table mt4 (ts timestamp, c1 int) tags (tag_binary binary(16), tag_nchar nchar(16), tag_int int, tag_bool bool, tag_float float, tag_double double)
sql create table st41 using mt4 tags ("beijing", 'nchar_tag', 100, false, 9.12345, 7.123456789)
sql describe st41
if $rows != 8 then
sql select tag_binary, tag_nchar, tag_int, tag_bool, tag_float, tag_double st41
if $rows != 1 then
return -1
if $data23 != beijing then
if $data00 != beijing then
return -1
if $data33 != nchar_tag then
if $data01 != nchar_tag then
return -1
if $data43 != 100 then
if $data02 != 100 then
return -1
if $data53 != false then
if $data03 != false then
return -1
if $data63 != 9.123450 then
if $dat04 != 9.123450 then
return -1
if $data73 != 7.123457 then
if $data05 != 7.123457 then
return -1
################# binary
sql alter table st41 set tag tag_binary = "shanghai"
sql describe st41
......@@ -23,7 +23,7 @@ $stb = $stbPrefix . $i
sql drop database $db -x step1
sql create database $db cache 2048 tables 200
sql create database $db cache 16 maxtables 200
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
......@@ -15,7 +15,7 @@ $db = $dbPrefix
$stb = $stbPrefix
sql drop database if exists $db
sql create database $db rows 200 maxTables 4
sql create database $db maxrows 200 maxTables 4
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 bool, c6 binary(10), c7 nchar(10)) tags(t1 int)
......@@ -130,6 +130,7 @@ if $data03 != 1 then
return -1
if $data04 != 0.000000000 then
print expect: 0.00000000 , actual: $data04
return -1
if $data05 != 1 then
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册