提交 1de610a4 编写于 作者: H hjxilinx

refactor extbuffer model.

上级 d55dadd0
...@@ -68,7 +68,7 @@ typedef struct SLocalReducer { ...@@ -68,7 +68,7 @@ typedef struct SLocalReducer {
bool hasPrevRow; // cannot be released bool hasPrevRow; // cannot be released
bool hasUnprocessedRow; bool hasUnprocessedRow;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;
tColModel * resColModel; SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SInterpolationInfo interpolationInfo; // interpolation support structure SInterpolationInfo interpolationInfo; // interpolation support structure
char * pFinalRes; // result data after interpo char * pFinalRes; // result data after interpo
...@@ -92,7 +92,7 @@ typedef struct SSubqueryState { ...@@ -92,7 +92,7 @@ typedef struct SSubqueryState {
typedef struct SRetrieveSupport { typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor; tOrderDescriptor *pOrderDescriptor;
tColModel * pFinalColModel; // colModel for final result SColumnModel * pFinalColModel; // colModel for final result
SSubqueryState * pState; SSubqueryState * pState;
int32_t subqueryIndex; // index of current vnode in vnode list int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSqlObj; SSqlObj * pParentSqlObj;
...@@ -102,9 +102,9 @@ typedef struct SRetrieveSupport { ...@@ -102,9 +102,9 @@ typedef struct SRetrieveSupport {
} SRetrieveSupport; } SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc, int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
tColModel **pFinalModel, uint32_t nBufferSize); SColumnModel **pFinalModel, uint32_t nBufferSize);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, tColModel *pFinalModel, void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
int32_t numOfVnodes); int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data, int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
...@@ -116,7 +116,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF ...@@ -116,7 +116,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
* create local reducer to launch the second-stage reduce process at client site * create local reducer to launch the second-stage reduce process at client site
*/ */
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
tColModel *finalModel, SSqlCmd *pSqlCmd, SSqlRes *pRes); SColumnModel *finalModel, SSqlCmd *pSqlCmd, SSqlRes *pRes);
void tscDestroyLocalReducer(SSqlObj *pSql); void tscDestroyLocalReducer(SSqlObj *pSql);
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "ttime.h" #include "ttime.h"
#include "ttypes.h" #include "ttypes.h"
#include "tutil.h" #include "tutil.h"
#include "tpercentile.h"
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) #define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes) #define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
...@@ -2416,7 +2417,7 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) { ...@@ -2416,7 +2417,7 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SSchema field[1] = {{pCtx->inputType, "dummyCol", 0, pCtx->inputBytes}}; SSchema field[1] = {{pCtx->inputType, "dummyCol", 0, pCtx->inputBytes}};
tColModel *pModel = tColModelCreate(field, 1, 1000); SColumnModel *pModel = createColumnModel(field, 1, 1000);
int32_t orderIdx = 0; int32_t orderIdx = 0;
// tOrderDesc object // tOrderDesc object
......
...@@ -321,7 +321,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload, ...@@ -321,7 +321,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
if (pToken->type == TK_NULL) { if (pToken->type == TK_NULL) {
*(uint32_t *)payload = TSDB_DATA_NCHAR_NULL; *(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
} else { } else {
// if the converted output len is over than pSchema->bytes, return error: 'Argument list too long' // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) { if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) {
char buf[512] = {0}; char buf[512] = {0};
snprintf(buf, 512, "%s", strerror(errno)); snprintf(buf, 512, "%s", strerror(errno));
......
...@@ -5453,8 +5453,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5453,8 +5453,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
const char* msg0 = "invalid table name"; const char* msg0 = "invalid table name";
const char* msg1 = "table name too long"; const char* msg1 = "table name too long";
const char* msg2 = "point interpolation query needs timestamp"; const char* msg2 = "point interpolation query needs timestamp";
const char* msg3 = "sliding value too small";
const char* msg4 = "sliding value no larger than the interval value";
const char* msg5 = "fill only available for interval query"; const char* msg5 = "fill only available for interval query";
const char* msg6 = "start(end) time of query range required or time range too large"; const char* msg6 = "start(end) time of query range required or time range too large";
const char* msg7 = "illegal number of tables in from clause"; const char* msg7 = "illegal number of tables in from clause";
...@@ -5587,30 +5585,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5587,30 +5585,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
if (!hasTimestampForPointInterpQuery(pQueryInfo)) { if (!hasTimestampForPointInterpQuery(pQueryInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2); return invalidSqlErrMsg(pQueryInfo->msg, msg2);
} }
// // set sliding value, the query time range needs to be decide in the first place
// SSQLToken* pSliding = &pQuerySql->sliding;
// if (pSliding->n != 0) {
// if (!tscEmbedded && pCmd->inStream == 0 && hasDefaultQueryTimeRange(pQueryInfo)) { // sliding only allowed in stream
// const char* msg = "time range expected for sliding window query";
// return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
// }
//
// getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->nSlidingTime);
// if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) {
// pQueryInfo->nSlidingTime /= 1000;
// }
//
// if (pQueryInfo->nSlidingTime < tsMinSlidingTime) {
// return invalidSqlErrMsg(pQueryInfo->msg, msg3);
// }
//
// if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) {
// return invalidSqlErrMsg(pQueryInfo->msg, msg4);
// }
// } else {
// pQueryInfo->nSlidingTime = -1;
// }
// in case of join query, time range is required. // in case of join query, time range is required.
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
......
此差异已折叠。
...@@ -901,7 +901,7 @@ int tscLaunchSTableSubqueries(SSqlObj *pSql) { ...@@ -901,7 +901,7 @@ int tscLaunchSTableSubqueries(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL; tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL; tOrderDescriptor *pDesc = NULL;
tColModel * pModel = NULL; SColumnModel * pModel = NULL;
pRes->qhandle = 1; // hack the qhandle check pRes->qhandle = 1; // hack the qhandle check
...@@ -1194,7 +1194,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -1194,7 +1194,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SSrcColumnInfo colInfo[256] = {0}; SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo); tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pSchema, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif #endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
...@@ -1214,17 +1214,17 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -1214,17 +1214,17 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
} else { // all data has been retrieved to client } else { // all data has been retrieved to client
/* data in from current vnode is stored in cache and disk */ /* data in from current vnode is stored in cache and disk */
uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfAllElems + trsupport->localBuffer->numOfElems; uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
pSvd->vnode, numOfRowsFromVnode, idx); pSvd->vnode, numOfRowsFromVnode, idx);
tColModelCompact(pDesc->pSchema, trsupport->localBuffer, pDesc->pSchema->maxCapacity); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
SSrcColumnInfo colInfo[256] = {0}; SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo); tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pSchema, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
trsupport->localBuffer->numOfElems, colInfo); trsupport->localBuffer->numOfElems, colInfo);
#endif #endif
...@@ -1256,7 +1256,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -1256,7 +1256,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
} }
// all sub-queries are returned, start to local merge process // all sub-queries are returned, start to local merge process
pDesc->pSchema->maxCapacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj, tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
pState->numOfTotal, pState->numOfRetrievedRows); pState->numOfTotal, pState->numOfRetrievedRows);
...@@ -1516,7 +1516,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { ...@@ -1516,7 +1516,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
char * pStart = buf + tsRpcHeadSize; char * pStart = buf + tsRpcHeadSize;
SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pSchema == NULL, query on meter if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pColumnModel == NULL, query on meter
SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
} else { // query on metric } else { // query on metric
......
...@@ -19,19 +19,16 @@ ...@@ -19,19 +19,16 @@
extern "C" { extern "C" {
#endif #endif
#include <stdio.h> #include "os.h"
#include <stdlib.h>
#include <string.h>
#include "tutil.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tutil.h"
#define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo #define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo
#define MIN_BUFFER_SIZE (1 << 19) #define MIN_BUFFER_SIZE (1 << 19)
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX #define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64 #define INITIAL_ALLOCATION_BUFFER_SIZE 64
// forward declare // forward declaration
struct tTagSchema; struct tTagSchema;
typedef enum EXT_BUFFER_FLUSH_MODEL { typedef enum EXT_BUFFER_FLUSH_MODEL {
...@@ -61,12 +58,12 @@ typedef struct tFlushoutData { ...@@ -61,12 +58,12 @@ typedef struct tFlushoutData {
tFlushoutInfo *pFlushoutInfo; tFlushoutInfo *pFlushoutInfo;
} tFlushoutData; } tFlushoutData;
typedef struct tFileMeta { typedef struct SFileInfo {
uint32_t nFileSize; // in pages uint32_t nFileSize; // in pages
uint32_t nPageSize; uint32_t pageSize;
uint32_t numOfElemsInFile; uint32_t numOfElemsInFile;
tFlushoutData flushoutData; tFlushoutData flushoutData;
} tFileMeta; } SFileInfo;
typedef struct tFilePage { typedef struct tFilePage {
uint64_t numOfElems; uint64_t numOfElems;
...@@ -78,65 +75,73 @@ typedef struct tFilePagesItem { ...@@ -78,65 +75,73 @@ typedef struct tFilePagesItem {
tFilePage item; tFilePage item;
} tFilePagesItem; } tFilePagesItem;
typedef struct tColModel { typedef struct SSchemaEx {
int32_t maxCapacity; struct SSchema field;
int32_t numOfCols; int16_t offset;
int16_t * colOffset; } SSchemaEx;
struct SSchema *pFields;
} tColModel; typedef struct SColumnModel {
int32_t capacity;
int32_t numOfCols;
int16_t rowSize;
SSchemaEx *pFields;
} SColumnModel;
typedef struct tOrderIdx { typedef struct SColumnOrderInfo {
int32_t numOfOrderedCols; int32_t numOfCols;
int16_t pData[]; int16_t pData[];
} tOrderIdx; } SColumnOrderInfo;
typedef struct tOrderDescriptor { typedef struct tOrderDescriptor {
union { SColumnModel * pColumnModel;
struct tTagSchema *pTagSchema; int32_t tsOrder; // timestamp order type if exists
tColModel * pSchema; SColumnOrderInfo orderIdx;
};
int32_t tsOrder; // timestamp order type if exists
tOrderIdx orderIdx;
} tOrderDescriptor; } tOrderDescriptor;
typedef struct tExtMemBuffer { typedef struct tExtMemBuffer {
int32_t nMaxSizeInPages; int32_t inMemCapacity;
int32_t nElemSize; int32_t nElemSize;
int32_t nPageSize; int32_t pageSize;
int32_t numOfTotalElems;
int32_t numOfAllElems;
int32_t numOfElemsInBuffer; int32_t numOfElemsInBuffer;
int32_t numOfElemsPerPage; int32_t numOfElemsPerPage;
int16_t numOfInMemPages;
int16_t numOfPagesInMem;
tFilePagesItem *pHead; tFilePagesItem *pHead;
tFilePagesItem *pTail; tFilePagesItem *pTail;
tFileMeta fileMeta; char * path;
FILE * file;
char dataFilePath[MAX_TMPFILE_PATH_LENGTH]; SFileInfo fileMeta;
FILE *dataFile;
tColModel *pColModel;
SColumnModel * pColumnModel;
EXT_BUFFER_FLUSH_MODEL flushModel; EXT_BUFFER_FLUSH_MODEL flushModel;
} tExtMemBuffer; } tExtMemBuffer;
/**
*
* @param fileNamePattern
* @param dstPath
*/
void getTmpfilePath(const char *fileNamePattern, char *dstPath); void getTmpfilePath(const char *fileNamePattern, char *dstPath);
/* /**
* create ext-memory buffer *
* @param inMemSize
* @param elemSize
* @param pModel
* @return
*/ */
void tExtMemBufferCreate(tExtMemBuffer **pMemBuffer, int32_t numOfBufferSize, int32_t elemSize, tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnModel *pModel);
const char *tmpDataFilePath, tColModel *pModel);
/* /**
* destroy ext-memory buffer *
* @param pMemBuffer
* @return
*/ */
void tExtMemBufferDestroy(tExtMemBuffer **pMemBuffer); void *destoryExtMemBuffer(tExtMemBuffer *pMemBuffer);
/* /**
* @param pMemBuffer * @param pMemBuffer
* @param data input data pointer * @param data input data pointer
* @param numOfRows number of rows in data * @param numOfRows number of rows in data
...@@ -145,12 +150,15 @@ void tExtMemBufferDestroy(tExtMemBuffer **pMemBuffer); ...@@ -145,12 +150,15 @@ void tExtMemBufferDestroy(tExtMemBuffer **pMemBuffer);
*/ */
int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRows); int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRows);
/* /**
* flush all data into disk and release all in-memory buffer *
* @param pMemBuffer
* @return
*/ */
bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer); bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer);
/* /**
*
* remove all data that has been put into buffer, including in buffer or * remove all data that has been put into buffer, including in buffer or
* ext-buffer(disk) * ext-buffer(disk)
*/ */
...@@ -163,11 +171,44 @@ void tExtMemBufferClear(tExtMemBuffer *pMemBuffer); ...@@ -163,11 +171,44 @@ void tExtMemBufferClear(tExtMemBuffer *pMemBuffer);
*/ */
bool tExtMemBufferLoadData(tExtMemBuffer *pMemBuffer, tFilePage *pFilePage, int32_t flushIdx, int32_t pageIdx); bool tExtMemBufferLoadData(tExtMemBuffer *pMemBuffer, tFilePage *pFilePage, int32_t flushIdx, int32_t pageIdx);
/**
*
* @param pMemBuffer
* @return
*/
bool tExtMemBufferIsAllDataInMem(tExtMemBuffer *pMemBuffer); bool tExtMemBufferIsAllDataInMem(tExtMemBuffer *pMemBuffer);
tColModel *tColModelCreate(SSchema *field, int32_t numOfCols, int32_t maxCapacity); /**
*
* @param fields
* @param numOfCols
* @param blockCapacity
* @return
*/
SColumnModel *createColumnModel(SSchema *fields, int32_t numOfCols, int32_t blockCapacity);
/**
*
* @param pSrc
* @return
*/
SColumnModel *cloneColumnModel(SColumnModel *pSrc);
/**
*
* @param pModel
*/
void destroyColumnModel(SColumnModel *pModel);
/*
* compress data into consecutive block without hole in data
*/
void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);
void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e);
SSchema *getColumnModelSchema(SColumnModel *pColumnModel, int32_t index);
void tColModelDestroy(tColModel *pModel); int16_t getColumnModelOffset(SColumnModel *pColumnModel, int32_t index);
typedef struct SSrcColumnInfo { typedef struct SSrcColumnInfo {
int32_t functionId; int32_t functionId;
...@@ -177,68 +218,18 @@ typedef struct SSrcColumnInfo { ...@@ -177,68 +218,18 @@ typedef struct SSrcColumnInfo {
/* /*
* display data in column format model for debug purpose only * display data in column format model for debug purpose only
*/ */
void tColModelDisplay(tColModel *pModel, void *pData, int32_t numOfRows, int32_t maxCount); void tColModelDisplay(SColumnModel *pModel, void *pData, int32_t numOfRows, int32_t maxCount);
void tColModelDisplayEx(tColModel *pModel, void *pData, int32_t numOfRows, int32_t maxCount, SSrcColumnInfo *pInfo); void tColModelDisplayEx(SColumnModel *pModel, void *pData, int32_t numOfRows, int32_t maxCount, SSrcColumnInfo *pInfo);
/* tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrderCols, SColumnModel *pModel,
* compress data into consecutive block without hole in data int32_t tsOrderType);
*/
void tColModelCompact(tColModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);
void tColModelErase(tColModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e);
tOrderDescriptor *tOrderDesCreate(int32_t *orderColIdx, int32_t numOfOrderCols, tColModel *pModel, int32_t tsOrderType);
void tOrderDescDestroy(tOrderDescriptor *pDesc); void tOrderDescDestroy(tOrderDescriptor *pDesc);
void tColModelAppend(tColModel *dstModel, tFilePage *dstPage, void *srcData, int32_t srcStartRows, void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t srcStartRows,
int32_t numOfRowsToWrite, int32_t srcCapacity); int32_t numOfRowsToWrite, int32_t srcCapacity);
///////////////////////////////////////////////////////////////////////////////////////////////////////
typedef struct MinMaxEntry {
union {
double dMinVal;
int32_t iMinVal;
int64_t i64MinVal;
};
union {
double dMaxVal;
int32_t iMaxVal;
int64_t i64MaxVal;
};
} MinMaxEntry;
typedef struct tMemBucketSegment {
int32_t numOfSlots;
MinMaxEntry * pBoundingEntries;
tExtMemBuffer **pBuffer;
} tMemBucketSegment;
typedef struct tMemBucket {
int16_t numOfSegs;
int16_t nTotalSlots;
int16_t nSlotsOfSeg;
int16_t dataType;
int16_t nElemSize;
int32_t numOfElems;
int32_t nTotalBufferSize;
int32_t maxElemsCapacity;
int16_t nPageSize;
int16_t numOfTotalPages;
int16_t numOfAvailPages; /* remain available buffer pages */
tMemBucketSegment *pSegs;
tOrderDescriptor * pOrderDesc;
MinMaxEntry nRange;
void (*HashFunc)(struct tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
} tMemBucket;
typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data); typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType); void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
...@@ -253,19 +244,6 @@ int32_t compare_a(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1 ...@@ -253,19 +244,6 @@ int32_t compare_a(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1
int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1, int32_t numOfRow2, int32_t s2, int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1, int32_t numOfRow2, int32_t s2,
char *data2); char *data2);
tMemBucket* tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize,
int16_t dataType, tOrderDescriptor *pDesc);
void tMemBucketDestroy(tMemBucket *pBucket);
void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows);
double getPercentile(tMemBucket *pMemBucket, double percent);
void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#include "tskiplist.h"
#define USE_ARRAYLIST #define USE_ARRAYLIST
#define MAX_HISTOGRAM_BIN 500 #define MAX_HISTOGRAM_BIN 500
......
...@@ -78,7 +78,7 @@ int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo); ...@@ -78,7 +78,7 @@ int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo);
*/ */
int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoType, tFilePage **data, int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoType, tFilePage **data,
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval, int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
const int64_t *pPrimaryKeyArray, tColModel *pModel, char **srcData, int64_t *defaultVal, const int64_t *pPrimaryKeyArray, SColumnModel *pModel, char **srcData, int64_t *defaultVal,
const int32_t *functionIDs, int32_t bufSize); const int32_t *functionIDs, int32_t bufSize);
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TPERCENTILE_H
#define TDENGINE_TPERCENTILE_H
#include "textbuffer.h"
typedef struct MinMaxEntry {
union {
double dMinVal;
int32_t iMinVal;
int64_t i64MinVal;
};
union {
double dMaxVal;
int32_t iMaxVal;
int64_t i64MaxVal;
};
} MinMaxEntry;
typedef struct tMemBucketSegment {
int32_t numOfSlots;
MinMaxEntry * pBoundingEntries;
tExtMemBuffer **pBuffer;
} tMemBucketSegment;
typedef struct tMemBucket {
int16_t numOfSegs;
int16_t nTotalSlots;
int16_t nSlotsOfSeg;
int16_t dataType;
int16_t nElemSize;
int32_t numOfElems;
int32_t nTotalBufferSize;
int32_t maxElemsCapacity;
int16_t pageSize;
int16_t numOfTotalPages;
int16_t numOfAvailPages; /* remain available buffer pages */
tMemBucketSegment *pSegs;
tOrderDescriptor * pOrderDesc;
MinMaxEntry nRange;
void (*HashFunc)(struct tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
} tMemBucket;
tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType,
tOrderDescriptor *pDesc);
void tMemBucketDestroy(tMemBucket *pBucket);
void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows);
double getPercentile(tMemBucket *pMemBucket, double percent);
void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx);
#endif // TDENGINE_TPERCENTILE_H
...@@ -33,11 +33,13 @@ extern "C" { ...@@ -33,11 +33,13 @@ extern "C" {
* 1. we implement a quick sort algorithm, may remove it later. * 1. we implement a quick sort algorithm, may remove it later.
*/ */
typedef struct tTagSchema { //typedef struct tTagSchema {
struct SSchema *pSchema; // struct SSchema *pSchema;
int32_t numOfCols; // int32_t numOfCols;
int32_t colOffset[]; // int32_t colOffset[];
} tTagSchema; //} tTagSchema;
typedef SColumnModel tTagSchema;
typedef struct tSidSet { typedef struct tSidSet {
int32_t numOfSids; int32_t numOfSids;
...@@ -45,8 +47,8 @@ typedef struct tSidSet { ...@@ -45,8 +47,8 @@ typedef struct tSidSet {
SMeterSidExtInfo **pSids; SMeterSidExtInfo **pSids;
int32_t * starterPos; // position of each subgroup, generated according to int32_t * starterPos; // position of each subgroup, generated according to
tTagSchema *pTagSchema; SColumnModel *pColumnModel;
tOrderIdx orderIdx; SColumnOrderInfo orderIdx;
} tSidSet; } tSidSet;
typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, void *param); typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, void *param);
...@@ -54,7 +56,7 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, void *param ...@@ -54,7 +56,7 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, void *param
tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOfMeters, SSchema *pSchema, tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOfMeters, SSchema *pSchema,
int32_t numOfTags, SColIndexEx *colList, int32_t numOfOrderCols); int32_t numOfTags, SColIndexEx *colList, int32_t numOfOrderCols);
tTagSchema *tCreateTagSchema(SSchema *pSchema, int32_t numOfTagCols); //tTagSchema *tCreateTagSchema(SSchema *pSchema, int32_t numOfTagCols);
int32_t *calculateSubGroup(void **pSids, int32_t numOfMeters, int32_t *numOfSubset, tOrderDescriptor *pOrderDesc, int32_t *calculateSubGroup(void **pSids, int32_t numOfMeters, int32_t *numOfSubset, tOrderDescriptor *pOrderDesc,
__ext_compar_fn_t compareFn); __ext_compar_fn_t compareFn);
......
...@@ -210,7 +210,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode) { ...@@ -210,7 +210,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode) {
for (int i = 0; i < pMeter->numOfColumns; ++i) { for (int i = 0; i < pMeter->numOfColumns; ++i) {
pCreateMeter->schema[i].type = pSchema[i].type; pCreateMeter->schema[i].type = pSchema[i].type;
/* strcpy(pCreateMeter->schema[i].name, pSchema[i].name); */ /* strcpy(pCreateMeter->schema[i].name, pColumnModel[i].name); */
pCreateMeter->schema[i].bytes = htons(pSchema[i].bytes); pCreateMeter->schema[i].bytes = htons(pSchema[i].bytes);
pCreateMeter->schema[i].colId = htons(pSchema[i].colId); pCreateMeter->schema[i].colId = htons(pSchema[i].colId);
} }
......
...@@ -70,7 +70,7 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para ...@@ -70,7 +70,7 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
STabObj* pNode1 = (STabObj*)p1; STabObj* pNode1 = (STabObj*)p1;
STabObj* pNode2 = (STabObj*)p2; STabObj* pNode2 = (STabObj*)p2;
for (int32_t i = 0; i < pOrderDesc->orderIdx.numOfOrderedCols; ++i) { for (int32_t i = 0; i < pOrderDesc->orderIdx.numOfCols; ++i) {
int32_t colIdx = pOrderDesc->orderIdx.pData[i]; int32_t colIdx = pOrderDesc->orderIdx.pData[i];
char* f1 = NULL; char* f1 = NULL;
...@@ -86,7 +86,9 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para ...@@ -86,7 +86,9 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
} else { } else {
f1 = mgmtMeterGetTag(pNode1, colIdx, NULL); f1 = mgmtMeterGetTag(pNode1, colIdx, NULL);
f2 = mgmtMeterGetTag(pNode2, colIdx, &schema); f2 = mgmtMeterGetTag(pNode2, colIdx, &schema);
assert(schema.type == pOrderDesc->pTagSchema->pSchema[colIdx].type);
SSchema* pSchema = getColumnModelSchema(pOrderDesc->pColumnModel, colIdx);
assert(schema.type == pSchema->type);
} }
int32_t ret = doCompare(f1, f2, schema.type, schema.bytes); int32_t ret = doCompare(f1, f2, schema.type, schema.bytes);
...@@ -109,7 +111,7 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para ...@@ -109,7 +111,7 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
* @param pOrderIndexInfo * @param pOrderIndexInfo
* @param numOfTags * @param numOfTags
*/ */
static void mgmtUpdateOrderTagColIndex(SMetricMetaMsg* pMetricMetaMsg, int32_t tableIndex, tOrderIdx* pOrderIndexInfo, static void mgmtUpdateOrderTagColIndex(SMetricMetaMsg* pMetricMetaMsg, int32_t tableIndex, SColumnOrderInfo* pOrderIndexInfo,
int32_t numOfTags) { int32_t numOfTags) {
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]); SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]);
SColIndexEx* groupColumnList = (SColIndexEx*)((char*)pMetricMetaMsg + pElem->groupbyTagColumnList); SColIndexEx* groupColumnList = (SColIndexEx*)((char*)pMetricMetaMsg + pElem->groupbyTagColumnList);
...@@ -123,7 +125,7 @@ static void mgmtUpdateOrderTagColIndex(SMetricMetaMsg* pMetricMetaMsg, int32_t t ...@@ -123,7 +125,7 @@ static void mgmtUpdateOrderTagColIndex(SMetricMetaMsg* pMetricMetaMsg, int32_t t
} }
} }
pOrderIndexInfo->numOfOrderedCols = numOfGroupbyTags; pOrderIndexInfo->numOfCols = numOfGroupbyTags;
} }
// todo merge sort function with losertree used // todo merge sort function with losertree used
...@@ -143,14 +145,14 @@ void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t ta ...@@ -143,14 +145,14 @@ void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t ta
*/ */
tOrderDescriptor* descriptor = tOrderDescriptor* descriptor =
(tOrderDescriptor*)calloc(1, sizeof(tOrderDescriptor) + sizeof(int32_t) * pElem->numOfGroupCols); (tOrderDescriptor*)calloc(1, sizeof(tOrderDescriptor) + sizeof(int32_t) * pElem->numOfGroupCols);
descriptor->pTagSchema = tCreateTagSchema(pTagSchema, pMetric->numOfTags); descriptor->pColumnModel = createColumnModel(pTagSchema, pMetric->numOfTags, 1);
descriptor->orderIdx.numOfOrderedCols = pElem->numOfGroupCols; descriptor->orderIdx.numOfCols = pElem->numOfGroupCols;
int32_t* startPos = NULL; int32_t* startPos = NULL;
int32_t numOfSubset = 1; int32_t numOfSubset = 1;
mgmtUpdateOrderTagColIndex(pMetricMetaMsg, tableIndex, &descriptor->orderIdx, pMetric->numOfTags); mgmtUpdateOrderTagColIndex(pMetricMetaMsg, tableIndex, &descriptor->orderIdx, pMetric->numOfTags);
if (descriptor->orderIdx.numOfOrderedCols > 0) { if (descriptor->orderIdx.numOfCols > 0) {
tQSortEx(pRes->pRes, POINTER_BYTES, 0, pRes->num - 1, descriptor, tabObjResultComparator); tQSortEx(pRes->pRes, POINTER_BYTES, 0, pRes->num - 1, descriptor, tabObjResultComparator);
startPos = calculateSubGroup(pRes->pRes, pRes->num, &numOfSubset, descriptor, tabObjResultComparator); startPos = calculateSubGroup(pRes->pRes, pRes->num, &numOfSubset, descriptor, tabObjResultComparator);
} else { } else {
...@@ -166,7 +168,7 @@ void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t ta ...@@ -166,7 +168,7 @@ void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t ta
*/ */
qsort(pRes->pRes, (size_t)pRes->num, POINTER_BYTES, tabObjVGIDComparator); qsort(pRes->pRes, (size_t)pRes->num, POINTER_BYTES, tabObjVGIDComparator);
free(descriptor->pTagSchema); free(descriptor->pColumnModel);
free(descriptor); free(descriptor);
free(startPos); free(startPos);
} }
...@@ -291,15 +293,15 @@ static void orderResult(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes, i ...@@ -291,15 +293,15 @@ static void orderResult(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes, i
STabObj* pMetric = mgmtGetMeter(pElem->meterId); STabObj* pMetric = mgmtGetMeter(pElem->meterId);
SSchema* pTagSchema = (SSchema*)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema)); SSchema* pTagSchema = (SSchema*)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
descriptor->pTagSchema = tCreateTagSchema(pTagSchema, pMetric->numOfTags); descriptor->pColumnModel = createColumnModel(pTagSchema, pMetric->numOfTags, 1);
descriptor->orderIdx.pData[0] = colIndex; descriptor->orderIdx.pData[0] = colIndex;
descriptor->orderIdx.numOfOrderedCols = 1; descriptor->orderIdx.numOfCols = 1;
// sort results list // sort results list
tQSortEx(pRes->pRes, POINTER_BYTES, 0, pRes->num - 1, descriptor, tabObjResultComparator); tQSortEx(pRes->pRes, POINTER_BYTES, 0, pRes->num - 1, descriptor, tabObjResultComparator);
free(descriptor->pTagSchema); free(descriptor->pColumnModel);
free(descriptor); free(descriptor);
} }
......
...@@ -127,7 +127,7 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam ...@@ -127,7 +127,7 @@ int vnodeCreateHeadDataFile(int vnode, int fileId, char *headName, char *dataNam
if (symlink(dDataName, dataName) != 0) return -1; if (symlink(dDataName, dataName) != 0) return -1;
if (symlink(dLastName, lastName) != 0) return -1; if (symlink(dLastName, lastName) != 0) return -1;
dPrint("vid:%d, fileId:%d, empty header file:%s dataFile:%s lastFile:%s on disk:%s is created ", dPrint("vid:%d, fileId:%d, empty header file:%s file:%s lastFile:%s on disk:%s is created ",
vnode, fileId, headName, dataName, lastName, path); vnode, fileId, headName, dataName, lastName, path);
return 0; return 0;
......
...@@ -589,7 +589,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue ...@@ -589,7 +589,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue
char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull, char *primaryColumnData, int32_t size, int32_t functionId, SField *pField, bool hasNull,
int32_t blockStatus, void *param, int32_t scanFlag); int32_t blockStatus, void *param, int32_t scanFlag);
void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isMetricQuery); void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isSTableQuery);
static void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols); static void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols);
static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
...@@ -2499,7 +2499,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2499,7 +2499,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv) {
} }
static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv, static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv,
SSchema *pTagsSchema, int16_t order, bool isMetricQuery) { tTagSchema *pTagsSchema, int16_t order, bool isSTableQuery) {
dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pQuery)); dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pQuery));
pRuntimeEnv->pMeterObj = pMeterObj; pRuntimeEnv->pMeterObj = pMeterObj;
...@@ -2520,8 +2520,10 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery ...@@ -2520,8 +2520,10 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
if (TSDB_COL_IS_TAG(pSqlFuncMsg->colInfo.flag)) { // process tag column info if (TSDB_COL_IS_TAG(pSqlFuncMsg->colInfo.flag)) { // process tag column info
pCtx->inputType = pTagsSchema[pColIndexEx->colIdx].type; SSchema* pSchema = getColumnModelSchema(pTagsSchema, pColIndexEx->colIdx);
pCtx->inputBytes = pTagsSchema[pColIndexEx->colIdx].bytes;
pCtx->inputType = pSchema->type;
pCtx->inputBytes = pSchema->bytes;
} else { } else {
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
...@@ -2567,11 +2569,11 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery ...@@ -2567,11 +2569,11 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery
// set the intermediate result output buffer // set the intermediate result output buffer
SResultInfo *pResInfo = &pRuntimeEnv->resultInfo[i]; SResultInfo *pResInfo = &pRuntimeEnv->resultInfo[i];
setResultInfoBuf(pResInfo, pQuery->pSelectExpr[i].interResBytes, isMetricQuery); setResultInfoBuf(pResInfo, pQuery->pSelectExpr[i].interResBytes, isSTableQuery);
} }
// if it is group by normal column, do not set output buffer, the output buffer is pResult // if it is group by normal column, do not set output buffer, the output buffer is pResult
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isMetricQuery) { if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isSTableQuery) {
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
} }
...@@ -4120,7 +4122,7 @@ static void allocMemForInterpo(SMeterQuerySupportObj *pSupporter, SQuery *pQuery ...@@ -4120,7 +4122,7 @@ static void allocMemForInterpo(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
} }
} }
static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isMetricQuery) { static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isSTableQuery) {
int32_t slot = 0; int32_t slot = 0;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) {
...@@ -4142,14 +4144,14 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue ...@@ -4142,14 +4144,14 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue
* for single table top/bottom query, the output for group by normal column, the output rows is * for single table top/bottom query, the output for group by normal column, the output rows is
* equals to the maximum rows, instead of 1. * equals to the maximum rows, instead of 1.
*/ */
if (!isMetricQuery && isTopBottomQuery(pQuery)) { if (!isSTableQuery && isTopBottomQuery(pQuery)) {
assert(pQuery->numOfOutputCols > 1); assert(pQuery->numOfOutputCols > 1);
SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1]; SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1];
pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64; pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64;
} }
createGroupResultBuf(pQuery, pOneRes, isMetricQuery); createGroupResultBuf(pQuery, pOneRes, isSTableQuery);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -4498,12 +4500,12 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) ...@@ -4498,12 +4500,12 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pQuery->lastKey = pQuery->skey; pQuery->lastKey = pQuery->skey;
// create runtime environment // create runtime environment
SSchema *pTagSchema = NULL; // SSchema *pColumnModel = NULL;
tTagSchema *pTagSchemaInfo = pSupporter->pSidSet->pTagSchema; tTagSchema *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel;
if (pTagSchemaInfo != NULL) { // if (pTagSchemaInfo != NULL) {
pTagSchema = pTagSchemaInfo->pSchema; // pColumnModel = pTagSchemaInfo->pSchema;
} // }
// get one queried meter // get one queried meter
SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid); SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid);
...@@ -4517,7 +4519,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) ...@@ -4517,7 +4519,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
} }
int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true); int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchemaInfo, TSQL_SO_ASC, true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
...@@ -5065,10 +5067,10 @@ static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMet ...@@ -5065,10 +5067,10 @@ static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMet
tVariant *param) { tVariant *param) {
assert(tagColIdx >= 0); assert(tagColIdx >= 0);
int32_t *fieldValueOffset = pTagSchema->colOffset; int16_t offset = getColumnModelOffset(pTagSchema, tagColIdx);
void * pStr = (char *)pMeterSidInfo->tags + fieldValueOffset[tagColIdx]; void * pStr = (char *)pMeterSidInfo->tags + offset;
SSchema *pCol = &pTagSchema->pSchema[tagColIdx]; SSchema *pCol = getColumnModelSchema(pTagSchema, tagColIdx);
tVariantDestroy(param); tVariantDestroy(param);
...@@ -5081,7 +5083,7 @@ static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMet ...@@ -5081,7 +5083,7 @@ static void doSetTagValueInParam(tTagSchema *pTagSchema, int32_t tagColIdx, SMet
void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SMeterSidExtInfo *pMeterSidInfo) { void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SMeterSidExtInfo *pMeterSidInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
tTagSchema *pTagSchema = pSidSet->pTagSchema; tTagSchema *pTagSchema = pSidSet->pColumnModel;
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
...@@ -5691,7 +5693,7 @@ void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { ...@@ -5691,7 +5693,7 @@ void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
pQuery->order.order = (pQuery->order.order ^ 1); pQuery->order.order = (pQuery->order.order ^ 1);
} }
void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isMetricQuery) { void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isSTableQuery) {
int32_t numOfOutput = pQuery->numOfOutputCols; int32_t numOfOutput = pQuery->numOfOutputCols;
pOneResult->resultInfo = calloc((size_t)numOfOutput, sizeof(SResultInfo)); pOneResult->resultInfo = calloc((size_t)numOfOutput, sizeof(SResultInfo));
...@@ -5704,7 +5706,7 @@ void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isMetricQ ...@@ -5704,7 +5706,7 @@ void createGroupResultBuf(SQuery *pQuery, SOutputRes *pOneResult, bool isMetricQ
pOneResult->result[i] = malloc(sizeof(tFilePage) + size * pOneResult->nAlloc); pOneResult->result[i] = malloc(sizeof(tFilePage) + size * pOneResult->nAlloc);
pOneResult->result[i]->numOfElems = 0; pOneResult->result[i]->numOfElems = 0;
setResultInfoBuf(pResInfo, (int32_t)size, isMetricQuery); setResultInfoBuf(pResInfo, (int32_t)size, isSTableQuery);
} }
} }
...@@ -7580,7 +7582,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue ...@@ -7580,7 +7582,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
sc[1].bytes = 8; sc[1].bytes = 8;
UNUSED(sc); UNUSED(sc);
tColModel *cm = tColModelCreate(sc, pQuery->numOfOutputCols, pRuntimeEnv->numOfRowsPerPage); SColumnModel *cm = createColumnModel(sc, pQuery->numOfOutputCols, pRuntimeEnv->numOfRowsPerPage);
// if (outputPage->numOfElems + numOfResult >= pRuntimeEnv->numOfRowsPerPage) // if (outputPage->numOfElems + numOfResult >= pRuntimeEnv->numOfRowsPerPage)
tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage); tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage);
...@@ -7717,7 +7719,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD ...@@ -7717,7 +7719,7 @@ static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterD
saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
} else { } else {
doApplyIntervalQueryOnBlock(pSupporter, pMeterQueryInfo, pBlockInfo, pPrimaryKey, pFields, searchFn); doApplyIntervalQueryOnBlock_rv(pSupporter, pMeterQueryInfo, pBlockInfo, pPrimaryKey, pFields, searchFn);
} }
} }
...@@ -7802,7 +7804,7 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p ...@@ -7802,7 +7804,7 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p
pSchema[i].type = pQuery->pSelectExpr[i].resType; pSchema[i].type = pQuery->pSelectExpr[i].resType;
} }
tColModel *pModel = tColModelCreate(pSchema, pQuery->numOfOutputCols, pQuery->pointsToRead); SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutputCols, pQuery->pointsToRead);
char * srcData[TSDB_MAX_COLUMNS] = {0}; char * srcData[TSDB_MAX_COLUMNS] = {0};
int32_t functions[TSDB_MAX_COLUMNS] = {0}; int32_t functions[TSDB_MAX_COLUMNS] = {0};
...@@ -7816,7 +7818,7 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p ...@@ -7816,7 +7818,7 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p
pQuery->nAggTimeInterval, (int64_t *)pDataSrc[0]->data, pModel, srcData, pQuery->nAggTimeInterval, (int64_t *)pDataSrc[0]->data, pModel, srcData,
pQuery->defaultVal, functions, pRuntimeEnv->pMeterObj->pointsPerFileBlock); pQuery->defaultVal, functions, pRuntimeEnv->pMeterObj->pointsPerFileBlock);
tColModelDestroy(pModel); destroyColumnModel(pModel);
free(pSchema); free(pSchema);
return numOfRes; return numOfRes;
......
...@@ -442,22 +442,22 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -442,22 +442,22 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
size = vnodeGetResultSize((void *)(pRetrieve->qhandle), &numOfRows); size = vnodeGetResultSize((void *)(pRetrieve->qhandle), &numOfRows);
// buffer size for progress information, including meter count,
// and for each meter, including 'uid' and 'TSKEY'.
int progressSize = 0;
if (pQInfo->pMeterQuerySupporter != NULL)
progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t);
else if (pQInfo->pObj != NULL)
progressSize = sizeof(int64_t) + sizeof(TSKEY) + sizeof(int32_t);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100);
if (pStart == NULL) {
taosSendSimpleRsp(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
goto _exit;
}
} }
// buffer size for progress information, including meter count,
// and for each meter, including 'uid' and 'TSKEY'.
int progressSize = 0;
if (pQInfo->pMeterQuerySupporter != NULL)
progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t);
else if (pQInfo->pObj != NULL)
progressSize = sizeof(int64_t) + sizeof(TSKEY) + sizeof(int32_t);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100);
if (pStart == NULL) {
taosSendSimpleRsp(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
goto _exit;
}
pMsg = pStart; pMsg = pStart;
*pMsg = code; *pMsg = code;
...@@ -485,26 +485,28 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -485,26 +485,28 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
// write the progress information of each meter to response // write the progress information of each meter to response
// this is required by subscriptions // this is required by subscriptions
if (pQInfo->pMeterQuerySupporter != NULL && pQInfo->pMeterQuerySupporter->pMeterSidExtInfo != NULL) { if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) {
*((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters); if (pQInfo->pMeterQuerySupporter != NULL && pQInfo->pMeterQuerySupporter->pMeterSidExtInfo != NULL) {
pMsg += sizeof(int32_t); *((int32_t *)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters);
for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) { pMsg += sizeof(int32_t);
*((int64_t*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid); for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) {
*((int64_t *)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid);
pMsg += sizeof(int64_t);
*((TSKEY *)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key);
pMsg += sizeof(TSKEY);
}
} else if (pQInfo->pObj != NULL) {
*((int32_t *)pMsg) = htonl(1);
pMsg += sizeof(int32_t);
*((int64_t *)pMsg) = htobe64(pQInfo->pObj->uid);
pMsg += sizeof(int64_t); pMsg += sizeof(int64_t);
*((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key); if (pQInfo->pointsRead > 0) {
*((TSKEY *)pMsg) = htobe64(pQInfo->query.lastKey + 1);
} else {
*((TSKEY *)pMsg) = htobe64(pQInfo->query.lastKey);
}
pMsg += sizeof(TSKEY); pMsg += sizeof(TSKEY);
} }
} else if (pQInfo->pObj != NULL) {
*((int32_t*)pMsg) = htonl(1);
pMsg += sizeof(int32_t);
*((int64_t*)pMsg) = htobe64(pQInfo->pObj->uid);
pMsg += sizeof(int64_t);
if (pQInfo->pointsRead > 0) {
*((TSKEY*)pMsg) = htobe64(pQInfo->query.lastKey + 1);
} else {
*((TSKEY*)pMsg) = htobe64(pQInfo->query.lastKey);
}
pMsg += sizeof(TSKEY);
} }
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
......
...@@ -24,10 +24,10 @@ ...@@ -24,10 +24,10 @@
#include "tast.h" #include "tast.h"
#include "vnodeTagMgmt.h" #include "vnodeTagMgmt.h"
#define GET_TAG_VAL_POINTER(s, col, sc, t) ((t *)(&((s)->tags[(sc)->colOffset[(col)]]))) #define GET_TAG_VAL_POINTER(s, col, sc, t) ((t *)(&((s)->tags[getColumnModelOffset(sc, col)])))
#define GET_TAG_VAL(s, col, sc, t) (*GET_TAG_VAL_POINTER(s, col, sc, t)) #define GET_TAG_VAL(s, col, sc, t) (*GET_TAG_VAL_POINTER(s, col, sc, t))
static void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, tOrderIdx *pOrder); static void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, SColumnOrderInfo *pOrder);
static void tSidSetDisplay(tSidSet *pSets); static void tSidSetDisplay(tSidSet *pSets);
...@@ -65,7 +65,7 @@ int32_t meterSidComparator(const void *p1, const void *p2, void *param) { ...@@ -65,7 +65,7 @@ int32_t meterSidComparator(const void *p1, const void *p2, void *param) {
SMeterSidExtInfo *s1 = (SMeterSidExtInfo *)p1; SMeterSidExtInfo *s1 = (SMeterSidExtInfo *)p1;
SMeterSidExtInfo *s2 = (SMeterSidExtInfo *)p2; SMeterSidExtInfo *s2 = (SMeterSidExtInfo *)p2;
for (int32_t i = 0; i < pOrderDesc->orderIdx.numOfOrderedCols; ++i) { for (int32_t i = 0; i < pOrderDesc->orderIdx.numOfCols; ++i) {
int32_t colIdx = pOrderDesc->orderIdx.pData[i]; int32_t colIdx = pOrderDesc->orderIdx.pData[i];
char * f1 = NULL; char * f1 = NULL;
...@@ -79,9 +79,9 @@ int32_t meterSidComparator(const void *p1, const void *p2, void *param) { ...@@ -79,9 +79,9 @@ int32_t meterSidComparator(const void *p1, const void *p2, void *param) {
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_METER_NAME_LEN; bytes = TSDB_METER_NAME_LEN;
} else { } else {
f1 = GET_TAG_VAL_POINTER(s1, colIdx, pOrderDesc->pTagSchema, char); f1 = GET_TAG_VAL_POINTER(s1, colIdx, pOrderDesc->pColumnModel, char);
f2 = GET_TAG_VAL_POINTER(s2, colIdx, pOrderDesc->pTagSchema, char); f2 = GET_TAG_VAL_POINTER(s2, colIdx, pOrderDesc->pColumnModel, char);
SSchema *pSchema = &pOrderDesc->pTagSchema->pSchema[colIdx]; SSchema *pSchema = getColumnModelSchema(pOrderDesc->pColumnModel, colIdx);
type = pSchema->type; type = pSchema->type;
bytes = pSchema->bytes; bytes = pSchema->bytes;
} }
...@@ -116,9 +116,9 @@ static void median(void **pMeterSids, size_t size, int32_t s1, int32_t s2, tOrde ...@@ -116,9 +116,9 @@ static void median(void **pMeterSids, size_t size, int32_t s1, int32_t s2, tOrde
compareFn(pMeterSids[s1], pMeterSids[s2], pOrderDesc) <= 0); compareFn(pMeterSids[s1], pMeterSids[s2], pOrderDesc) <= 0);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
tTagsPrints(pMeterSids[s1], pOrderDesc->pTagSchema, &pOrderDesc->orderIdx); tTagsPrints(pMeterSids[s1], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx);
tTagsPrints(pMeterSids[midIdx], pOrderDesc->pTagSchema, &pOrderDesc->orderIdx); tTagsPrints(pMeterSids[midIdx], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx);
tTagsPrints(pMeterSids[s2], pOrderDesc->pTagSchema, &pOrderDesc->orderIdx); tTagsPrints(pMeterSids[s2], pOrderDesc->pColumnModel, &pOrderDesc->orderIdx);
#endif #endif
} }
...@@ -241,24 +241,24 @@ int32_t *calculateSubGroup(void **pSids, int32_t numOfMeters, int32_t *numOfSubs ...@@ -241,24 +241,24 @@ int32_t *calculateSubGroup(void **pSids, int32_t numOfMeters, int32_t *numOfSubs
return starterPos; return starterPos;
} }
tTagSchema *tCreateTagSchema(SSchema *pSchema, int32_t numOfTagCols) { //tTagSchema *tCreateTagSchema(SSchema *pSchema, int32_t numOfTagCols) {
if (numOfTagCols == 0 || pSchema == NULL) { // if (numOfTagCols == 0 || pSchema == NULL) {
return NULL; // return NULL;
} // }
//
tTagSchema *pTagSchema = // tTagSchema *pColumnModel =
(tTagSchema *)calloc(1, sizeof(tTagSchema) + numOfTagCols * sizeof(int32_t) + sizeof(SSchema) * numOfTagCols); // (tTagSchema *)calloc(1, sizeof(tTagSchema) + numOfTagCols * sizeof(int32_t) + sizeof(SSchema) * numOfTagCols);
//
pTagSchema->colOffset[0] = 0; // pColumnModel->colOffset[0] = 0;
pTagSchema->numOfCols = numOfTagCols; // pColumnModel->numOfCols = numOfTagCols;
for (int32_t i = 1; i < numOfTagCols; ++i) { // for (int32_t i = 1; i < numOfTagCols; ++i) {
pTagSchema->colOffset[i] = (pTagSchema->colOffset[i - 1] + pSchema[i - 1].bytes); // pColumnModel->colOffset[i] = (pColumnModel->colOffset[i - 1] + pSchema[i - 1].bytes);
} // }
//
pTagSchema->pSchema = (SSchema *)&(pTagSchema->colOffset[numOfTagCols]); // pColumnModel->pSchema = (SSchema *)&(pColumnModel->colOffset[numOfTagCols]);
memcpy(pTagSchema->pSchema, pSchema, sizeof(SSchema) * numOfTagCols); // memcpy(pColumnModel->pSchema, pSchema, sizeof(SSchema) * numOfTagCols);
return pTagSchema; // return pColumnModel;
} //}
tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOfMeters, SSchema *pSchema, tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOfMeters, SSchema *pSchema,
int32_t numOfTags, SColIndexEx *colList, int32_t numOfCols) { int32_t numOfTags, SColIndexEx *colList, int32_t numOfCols) {
...@@ -269,8 +269,8 @@ tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOf ...@@ -269,8 +269,8 @@ tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOf
pSidSet->numOfSids = numOfMeters; pSidSet->numOfSids = numOfMeters;
pSidSet->pSids = pMeterSidExtInfo; pSidSet->pSids = pMeterSidExtInfo;
pSidSet->pTagSchema = tCreateTagSchema(pSchema, numOfTags); pSidSet->pColumnModel = createColumnModel(pSchema, numOfTags, 1);
pSidSet->orderIdx.numOfOrderedCols = numOfCols; pSidSet->orderIdx.numOfCols = numOfCols;
/* /*
* in case of "group by tbname,normal_col", the normal_col is ignored * in case of "group by tbname,normal_col", the normal_col is ignored
...@@ -282,7 +282,7 @@ tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOf ...@@ -282,7 +282,7 @@ tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOf
} }
} }
pSidSet->orderIdx.numOfOrderedCols = numOfTagCols; pSidSet->orderIdx.numOfCols = numOfTagCols;
pSidSet->starterPos = NULL; pSidSet->starterPos = NULL;
return pSidSet; return pSidSet;
...@@ -291,19 +291,19 @@ tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOf ...@@ -291,19 +291,19 @@ tSidSet *tSidSetCreate(struct SMeterSidExtInfo **pMeterSidExtInfo, int32_t numOf
void tSidSetDestroy(tSidSet **pSets) { void tSidSetDestroy(tSidSet **pSets) {
if ((*pSets) != NULL) { if ((*pSets) != NULL) {
tfree((*pSets)->starterPos); tfree((*pSets)->starterPos);
tfree((*pSets)->pTagSchema)(*pSets)->pSids = NULL; tfree((*pSets)->pColumnModel)(*pSets)->pSids = NULL;
tfree(*pSets); tfree(*pSets);
} }
} }
void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, tOrderIdx *pOrder) { void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, SColumnOrderInfo *pOrder) {
if (pSchema == NULL) { if (pSchema == NULL) {
return; return;
} }
printf("sid: %-5d tags(", pMeterInfo->sid); printf("sid: %-5d tags(", pMeterInfo->sid);
for (int32_t i = 0; i < pOrder->numOfOrderedCols; ++i) { for (int32_t i = 0; i < pOrder->numOfCols; ++i) {
int32_t colIndex = pOrder->pData[i]; int32_t colIndex = pOrder->pData[i];
// it is the tbname column // it is the tbname column
...@@ -312,7 +312,9 @@ void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, tOrderIdx *p ...@@ -312,7 +312,9 @@ void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, tOrderIdx *p
continue; continue;
} }
switch (pSchema->pSchema[colIndex].type) { SSchema* s = getColumnModelSchema(pSchema, colIndex);
switch (s->type) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
printf("%d, ", GET_TAG_VAL(pMeterInfo, colIndex, pSchema, int32_t)); printf("%d, ", GET_TAG_VAL(pMeterInfo, colIndex, pSchema, int32_t));
break; break;
...@@ -336,9 +338,9 @@ void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, tOrderIdx *p ...@@ -336,9 +338,9 @@ void tTagsPrints(SMeterSidExtInfo *pMeterInfo, tTagSchema *pSchema, tOrderIdx *p
break; break;
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
char *data = GET_TAG_VAL_POINTER(pMeterInfo, colIndex, pSchema, char); char *data = GET_TAG_VAL_POINTER(pMeterInfo, colIndex, pSchema, char);
char buffer[512] = {0}; char buffer[512] = {0};
taosUcs4ToMbs(data, s->bytes, buffer);
taosUcs4ToMbs(data, pSchema->pSchema[colIndex].bytes, buffer);
printf("%s, ", buffer); printf("%s, ", buffer);
break; break;
} }
...@@ -370,16 +372,16 @@ static void UNUSED_FUNC tSidSetDisplay(tSidSet *pSets) { ...@@ -370,16 +372,16 @@ static void UNUSED_FUNC tSidSetDisplay(tSidSet *pSets) {
printf("the %d-th subgroup: \n", i + 1); printf("the %d-th subgroup: \n", i + 1);
for (int32_t j = s; j < e; ++j) { for (int32_t j = s; j < e; ++j) {
tTagsPrints(pSets->pSids[j], pSets->pTagSchema, &pSets->orderIdx); tTagsPrints(pSets->pSids[j], pSets->pColumnModel, &pSets->orderIdx);
} }
} }
} }
void tSidSetSort(tSidSet *pSets) { void tSidSetSort(tSidSet *pSets) {
pTrace("number of meters in sort: %d", pSets->numOfSids); pTrace("number of meters in sort: %d", pSets->numOfSids);
tOrderIdx *pOrderIdx = &pSets->orderIdx; SColumnOrderInfo *pOrderIdx = &pSets->orderIdx;
if (pOrderIdx->numOfOrderedCols == 0 || pSets->numOfSids <= 1 || pSets->pTagSchema == NULL) { // no group by tags clause if (pOrderIdx->numOfCols == 0 || pSets->numOfSids <= 1 || pSets->pColumnModel == NULL) { // no group by tags clause
pSets->numOfSubSet = 1; pSets->numOfSubSet = 1;
pSets->starterPos = (int32_t *)malloc(sizeof(int32_t) * (pSets->numOfSubSet + 1)); pSets->starterPos = (int32_t *)malloc(sizeof(int32_t) * (pSets->numOfSubSet + 1));
pSets->starterPos[0] = 0; pSets->starterPos[0] = 0;
...@@ -390,11 +392,11 @@ void tSidSetSort(tSidSet *pSets) { ...@@ -390,11 +392,11 @@ void tSidSetSort(tSidSet *pSets) {
#endif #endif
} else { } else {
tOrderDescriptor *descriptor = tOrderDescriptor *descriptor =
(tOrderDescriptor *)calloc(1, sizeof(tOrderDescriptor) + sizeof(int16_t) * pSets->orderIdx.numOfOrderedCols); (tOrderDescriptor *)calloc(1, sizeof(tOrderDescriptor) + sizeof(int16_t) * pSets->orderIdx.numOfCols);
descriptor->pTagSchema = pSets->pTagSchema; descriptor->pColumnModel = pSets->pColumnModel;
descriptor->orderIdx = pSets->orderIdx; descriptor->orderIdx = pSets->orderIdx;
memcpy(descriptor->orderIdx.pData, pOrderIdx->pData, sizeof(int16_t) * pSets->orderIdx.numOfOrderedCols); memcpy(descriptor->orderIdx.pData, pOrderIdx->pData, sizeof(int16_t) * pSets->orderIdx.numOfCols);
tQSortEx((void **)pSets->pSids, POINTER_BYTES, 0, pSets->numOfSids - 1, descriptor, meterSidComparator); tQSortEx((void **)pSets->pSids, POINTER_BYTES, 0, pSets->numOfSids - 1, descriptor, meterSidComparator);
pSets->starterPos = pSets->starterPos =
......
...@@ -247,7 +247,7 @@ SSqlFunctionExpr* vnodeCreateSqlFunctionExpr(SQueryMeterMsg* pQueryMsg, int32_t* ...@@ -247,7 +247,7 @@ SSqlFunctionExpr* vnodeCreateSqlFunctionExpr(SQueryMeterMsg* pQueryMsg, int32_t*
SColIndexEx* pColumnIndexExInfo = &pExprs[i].pBase.colInfo; SColIndexEx* pColumnIndexExInfo = &pExprs[i].pBase.colInfo;
// tag column schema is kept in pQueryMsg->pTagSchema // tag column schema is kept in pQueryMsg->pColumnModel
if (TSDB_COL_IS_TAG(pColumnIndexExInfo->flag)) { if (TSDB_COL_IS_TAG(pColumnIndexExInfo->flag)) {
if (pColumnIndexExInfo->colIdx >= pQueryMsg->numOfTagsCols) { if (pColumnIndexExInfo->colIdx >= pQueryMsg->numOfTagsCols) {
*code = TSDB_CODE_INVALID_QUERY_MSG; *code = TSDB_CODE_INVALID_QUERY_MSG;
......
此差异已折叠。
...@@ -205,16 +205,18 @@ static char* getPos(char* data, int32_t bytes, int32_t order, int32_t capacity, ...@@ -205,16 +205,18 @@ static char* getPos(char* data, int32_t bytes, int32_t order, int32_t capacity,
// } // }
} }
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, tColModel* pModel, int32_t order, int32_t start, static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, int32_t start,
int32_t capacity, int32_t num) { int32_t capacity, int32_t num) {
for (int32_t j = 0, i = start; i < pModel->numOfCols; ++i, ++j) { for (int32_t j = 0, i = start; i < pModel->numOfCols; ++i, ++j) {
char* val1 = getPos(data[i]->data, pModel->pFields[i].bytes, order, capacity, num); SSchema* pSchema = getColumnModelSchema(pModel, i);
assignVal(val1, pTags[j], pModel->pFields[i].bytes, pModel->pFields[i].type);
char* val1 = getPos(data[i]->data, pSchema->bytes, order, capacity, num);
assignVal(val1, pTags[j], pSchema->bytes, pSchema->type);
} }
} }
static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data,
tColModel* pModel, int32_t* num, char** srcData, int64_t nInterval, int64_t* defaultVal, SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval, int64_t* defaultVal,
int64_t currentTimestamp, int32_t capacity, int32_t numOfTags, char** pTags, int64_t currentTimestamp, int32_t capacity, int32_t numOfTags, char** pTags,
bool outOfBound) { bool outOfBound) {
char** prevValues = &pInterpoInfo->prevValues; char** prevValues = &pInterpoInfo->prevValues;
...@@ -234,18 +236,23 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp ...@@ -234,18 +236,23 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
char* pInterpolationData = INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? *prevValues : *nextValues; char* pInterpolationData = INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? *prevValues : *nextValues;
if (pInterpolationData != NULL) { if (pInterpolationData != NULL) {
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
char* val1 = getPos(data[i]->data, pModel->pFields[i].bytes, pInterpoInfo->order, capacity, *num); SSchema* pSchema = getColumnModelSchema(pModel, i);
int16_t offset = getColumnModelOffset(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
if (isNull(pInterpolationData + pModel->colOffset[i], pModel->pFields[i].type)) { if (isNull(pInterpolationData + offset, pSchema->type)) {
setNull(val1, pModel->pFields[i].type, pModel->pFields[i].bytes); setNull(val1, pSchema->type, pSchema->bytes);
} else { } else {
assignVal(val1, pInterpolationData + pModel->colOffset[i], pModel->pFields[i].bytes, pModel->pFields[i].type); assignVal(val1, pInterpolationData + offset, pSchema->bytes, pSchema->type);
} }
} }
} else { /* no prev value yet, set the value for null */ } else { /* no prev value yet, set the value for null */
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
char* val1 = getPos(data[i]->data, pModel->pFields[i].bytes, pInterpoInfo->order, capacity, *num); SSchema* pSchema = getColumnModelSchema(pModel, i);
setNull(val1, pModel->pFields[i].type, pModel->pFields[i].bytes);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
setNull(val1, pSchema->type, pSchema->bytes);
} }
} }
...@@ -254,34 +261,41 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp ...@@ -254,34 +261,41 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
// TODO : linear interpolation supports NULL value // TODO : linear interpolation supports NULL value
if (*prevValues != NULL && !outOfBound) { if (*prevValues != NULL && !outOfBound) {
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
int32_t type = pModel->pFields[i].type; SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pModel->pFields[i].bytes, pInterpoInfo->order, capacity, *num); int16_t offset = getColumnModelOffset(pModel, i);
int16_t type = pSchema->type;
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
setNull(val1, pModel->pFields[i].type, pModel->pFields[i].bytes); setNull(val1, type, pSchema->bytes);
continue; continue;
} }
point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + pModel->colOffset[i]}; point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + offset};
point2 = (SPoint){.key = currentTimestamp, .val = srcData[i] + pInterpoInfo->rowIdx * pModel->pFields[i].bytes}; point2 = (SPoint){.key = currentTimestamp, .val = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes};
point = (SPoint){.key = pInterpoInfo->startTimestamp, .val = val1}; point = (SPoint){.key = pInterpoInfo->startTimestamp, .val = val1};
taosDoLinearInterpolation(pModel->pFields[i].type, &point1, &point2, &point); taosDoLinearInterpolation(type, &point1, &point2, &point);
} }
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num);
} else { } else {
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
char* val1 = getPos(data[i]->data, pModel->pFields[i].bytes, pInterpoInfo->order, capacity, *num); SSchema* pSchema = getColumnModelSchema(pModel, i);
setNull(val1, pModel->pFields[i].type, pModel->pFields[i].bytes);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
setNull(val1, pSchema->type, pSchema->bytes);
} }
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num);
} }
} else { /* default value interpolation */ } else { /* default value interpolation */
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
char* val1 = getPos(data[i]->data, pModel->pFields[i].bytes, pInterpoInfo->order, capacity, *num); SSchema* pSchema = getColumnModelSchema(pModel, i);
assignVal(val1, (char*)&defaultVal[i], pModel->pFields[i].bytes, pModel->pFields[i].type);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num);
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type);
} }
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num);
...@@ -295,7 +309,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp ...@@ -295,7 +309,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp
int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data,
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval, int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
const int64_t* pPrimaryKeyArray, tColModel* pModel, char** srcData, int64_t* defaultVal, const int64_t* pPrimaryKeyArray, SColumnModel* pModel, char** srcData, int64_t* defaultVal,
const int32_t* functionIDs, int32_t bufSize) { const int32_t* functionIDs, int32_t bufSize) {
int32_t num = 0; int32_t num = 0;
pInterpoInfo->numOfCurrentInterpo = 0; pInterpoInfo->numOfCurrentInterpo = 0;
...@@ -328,17 +342,21 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp ...@@ -328,17 +342,21 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) {
/* set the next value for interpolation */ /* set the next value for interpolation */
if (*nextValues == NULL) { if (*nextValues == NULL) {
*nextValues = *nextValues = calloc(1, pModel->rowSize);
calloc(1, pModel->colOffset[pModel->numOfCols - 1] + pModel->pFields[pModel->numOfCols - 1].bytes);
for (int i = 1; i < pModel->numOfCols; i++) { for (int i = 1; i < pModel->numOfCols; i++) {
setNull(*nextValues + pModel->colOffset[i], pModel->pFields[i].type, pModel->pFields[i].bytes); int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
setNull(*nextValues + offset, pSchema->type, pSchema->bytes);
} }
} }
int32_t offset = pInterpoInfo->rowIdx; int32_t offset = pInterpoInfo->rowIdx;
for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) { for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) {
memcpy(*nextValues + tlen, srcData[i] + offset * pModel->pFields[i].bytes, pModel->pFields[i].bytes); SSchema* pSchema = getColumnModelSchema(pModel, i);
tlen += pModel->pFields[i].bytes;
memcpy(*nextValues + tlen, srcData[i] + offset * pSchema->bytes, pSchema->bytes);
tlen += pSchema->bytes;
} }
} }
...@@ -358,37 +376,41 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp ...@@ -358,37 +376,41 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp
if (pInterpoInfo->startTimestamp == currentTimestamp) { if (pInterpoInfo->startTimestamp == currentTimestamp) {
if (*prevValues == NULL) { if (*prevValues == NULL) {
*prevValues = *prevValues = calloc(1, pModel->rowSize);
calloc(1, pModel->colOffset[pModel->numOfCols - 1] + pModel->pFields[pModel->numOfCols - 1].bytes);
for (int i = 1; i < pModel->numOfCols; i++) { for (int i = 1; i < pModel->numOfCols; i++) {
setNull(*prevValues + pModel->colOffset[i], pModel->pFields[i].type, pModel->pFields[i].bytes); int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
setNull(*prevValues + offset, pSchema->type, pSchema->bytes);
} }
} }
// assign rows to dst buffer // assign rows to dst buffer
int32_t i = 0; int32_t i = 0;
for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) { for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) {
char* val1 = getPos(data[i]->data, pModel->pFields[i].bytes, pInterpoInfo->order, bufSize, num); int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, bufSize, num);
if (i == 0 || if (i == 0 ||
(functionIDs[i] != TSDB_FUNC_COUNT && (functionIDs[i] != TSDB_FUNC_COUNT &&
!isNull(srcData[i] + pInterpoInfo->rowIdx * pModel->pFields[i].bytes, pModel->pFields[i].type)) || !isNull(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->type)) ||
(functionIDs[i] == TSDB_FUNC_COUNT && (functionIDs[i] == TSDB_FUNC_COUNT &&
*(int64_t*)(srcData[i] + pInterpoInfo->rowIdx * pModel->pFields[i].bytes) != 0)) { *(int64_t*)(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes) != 0)) {
assignVal(val1, srcData[i] + pInterpoInfo->rowIdx * pModel->pFields[i].bytes, pModel->pFields[i].bytes,
pModel->pFields[i].type); assignVal(val1, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes, pSchema->type);
memcpy(*prevValues + tlen, srcData[i] + pInterpoInfo->rowIdx * pModel->pFields[i].bytes, memcpy(*prevValues + tlen, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes);
pModel->pFields[i].bytes);
} else { // i > 0 and isNULL, do interpolation } else { // i > 0 and isNULL, do interpolation
if (interpoType == TSDB_INTERPO_PREV) { if (interpoType == TSDB_INTERPO_PREV) {
assignVal(val1, *prevValues + pModel->colOffset[i], pModel->pFields[i].bytes, pModel->pFields[i].type); assignVal(val1, *prevValues + offset, pSchema->bytes, pSchema->type);
} else if (interpoType == TSDB_INTERPO_LINEAR) { } else if (interpoType == TSDB_INTERPO_LINEAR) {
// TODO: // TODO:
} else { } else {
assignVal(val1, (char*)&defaultVal[i], pModel->pFields[i].bytes, pModel->pFields[i].type); assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type);
} }
} }
tlen += pModel->pFields[i].bytes; tlen += pSchema->bytes;
} }
/* set the tag value for final result */ /* set the tag value for final result */
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册