提交 9ddff93b 编写于 作者: K kailixu

feat: support 64k for row/column

上级 06494ba5
...@@ -288,10 +288,10 @@ typedef struct { ...@@ -288,10 +288,10 @@ typedef struct {
int32_t index; int32_t index;
int32_t rowSize; int32_t rowSize;
int32_t numOfRows; int32_t numOfRows;
void * pIter; void *pIter;
void * pVgIter; void *pVgIter;
void ** ppShow; void **ppShow;
int16_t offset[TSDB_MAX_COLUMNS]; uint16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS];
int32_t numOfReads; int32_t numOfReads;
int8_t maxReplica; int8_t maxReplica;
......
...@@ -190,12 +190,12 @@ typedef struct { ...@@ -190,12 +190,12 @@ typedef struct {
} SMovingAvgInfo; } SMovingAvgInfo;
typedef struct { typedef struct {
int32_t totalPoints; int32_t totalPoints;
int32_t numSampled; int32_t numSampled;
int16_t colBytes; uint16_t colBytes;
char *values; char *values;
int64_t *timeStamps; int64_t *timeStamps;
char *taglists; char *taglists;
} SSampleFuncInfo; } SSampleFuncInfo;
typedef struct SElapsedInfo { typedef struct SElapsedInfo {
...@@ -5271,7 +5271,7 @@ static void mavg_function(SQLFunctionCtx *pCtx) { ...@@ -5271,7 +5271,7 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
// Sample function with reservoir sampling algorithm // Sample function with reservoir sampling algorithm
static void assignResultSample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int32_t idx, int64_t ts, void *pData, uint16_t type, int16_t bytes, char *inputTags) { static void assignResultSample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int32_t idx, int64_t ts, void *pData, uint16_t type, uint16_t bytes, char *inputTags) {
assignVal(pInfo->values + idx*bytes, pData, bytes, type); assignVal(pInfo->values + idx*bytes, pData, bytes, type);
*(pInfo->timeStamps + idx) = ts; *(pInfo->timeStamps + idx) = ts;
...@@ -5296,7 +5296,7 @@ static void assignResultSample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int ...@@ -5296,7 +5296,7 @@ static void assignResultSample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int
} }
} }
static void do_reservoir_sample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int32_t samplesK, int64_t ts, void *pData, uint16_t type, int16_t bytes) { static void do_reservoir_sample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int32_t samplesK, int64_t ts, void *pData, uint16_t type, uint16_t bytes) {
pInfo->totalPoints++; pInfo->totalPoints++;
if (pInfo->numSampled < samplesK) { if (pInfo->numSampled < samplesK) {
assignResultSample(pCtx, pInfo, pInfo->numSampled, ts, pData, type, bytes, NULL); assignResultSample(pCtx, pInfo, pInfo->numSampled, ts, pData, type, bytes, NULL);
......
...@@ -338,9 +338,9 @@ static void sortGroupResByOrderList(SGroupResInfo* pGroupResInfo, SQueryRuntimeE ...@@ -338,9 +338,9 @@ static void sortGroupResByOrderList(SGroupResInfo* pGroupResInfo, SQueryRuntimeE
} }
// get dataOffset and index on pRuntimeEnv->pQueryAttr->pExpr1 // get dataOffset and index on pRuntimeEnv->pQueryAttr->pExpr1
int16_t dataOffset = 0; uint16_t dataOffset = 0;
int16_t type = 0; int16_t type = 0;
int32_t bytes = 0; int32_t bytes = 0;
for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, j); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, j);
if (pCtx[j].colId == pFirstGroupCol->colId) { if (pCtx[j].colId == pFirstGroupCol->colId) {
...@@ -2074,7 +2074,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasic ...@@ -2074,7 +2074,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasic
SQLFunctionCtx* pCtx = binfo->pCtx; SQLFunctionCtx* pCtx = binfo->pCtx;
// not assign result buffer yet, add new result buffer, TODO remove it // not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData; char* d = pData;
uint16_t len = bytes; uint16_t len = bytes;
int64_t tid = 0; int64_t tid = 0;
...@@ -2283,7 +2283,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -2283,7 +2283,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
pCtx->numOfParams = pSqlExpr->numOfParams; pCtx->numOfParams = pSqlExpr->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) { for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pSqlExpr->param[j].nType; int16_t type = pSqlExpr->param[j].nType;
int16_t bytes = pSqlExpr->param[j].nLen; int32_t bytes = pSqlExpr->param[j].nLen;
if (pSqlExpr->functionId == TSDB_FUNC_STDDEV_DST || pSqlExpr->functionId == TSDB_FUNC_TS_COMP) { if (pSqlExpr->functionId == TSDB_FUNC_STDDEV_DST || pSqlExpr->functionId == TSDB_FUNC_TS_COMP) {
continue; continue;
} }
...@@ -3297,7 +3297,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { ...@@ -3297,7 +3297,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes; int32_t bytes = pColumnInfoData->info.bytes;
memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes, memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes,
len * bytes); len * bytes);
} }
...@@ -3313,7 +3313,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { ...@@ -3313,7 +3313,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes; int32_t bytes = pColumnInfoData->info.bytes;
memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes); memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes);
} }
...@@ -6681,7 +6681,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -6681,7 +6681,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes; int32_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes); memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes);
} }
...@@ -7384,7 +7384,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -7384,7 +7384,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
int16_t bytes = pColInfoData->info.bytes; int32_t bytes = pColInfoData->info.bytes;
// int16_t type = pColInfoData->info.type; // int16_t type = pColInfoData->info.type;
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
...@@ -8584,7 +8584,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { ...@@ -8584,7 +8584,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
count = 0; count = 0;
uint16_t bytes = pExprInfo->base.resBytes; int32_t bytes = pExprInfo->base.resBytes;
int16_t type = pExprInfo->base.resType; int16_t type = pExprInfo->base.resType;
for (int32_t i = 0; i < pQueryAttr->numOfTags; ++i) { for (int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
...@@ -8659,7 +8659,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { ...@@ -8659,7 +8659,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
char * data = NULL, *dst = NULL; char * data = NULL, *dst = NULL;
int16_t type = 0; int16_t type = 0;
uint16_t bytes = 0; int32_t bytes = 0;
for (int32_t j = 0; j < pOperator->numOfOutput; ++j) { for (int32_t j = 0; j < pOperator->numOfOutput; ++j) {
// not assign value in case of user defined constant output column // not assign value in case of user defined constant output column
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.colInfo.flag)) { if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.colInfo.flag)) {
...@@ -9690,8 +9690,8 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp ...@@ -9690,8 +9690,8 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
tVariantAssign(&pExprs[i].base.param[j], &pExprMsg[i]->param[j]); tVariantAssign(&pExprs[i].base.param[j], &pExprMsg[i]->param[j]);
} }
int16_t type = 0; int16_t type = 0;
int16_t bytes = 0; uint16_t bytes = 0;
// parse the arithmetic expression // parse the arithmetic expression
if (pExprs[i].base.functionId == TSDB_FUNC_SCALAR_EXPR) { if (pExprs[i].base.functionId == TSDB_FUNC_SCALAR_EXPR) {
...@@ -9834,7 +9834,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu ...@@ -9834,7 +9834,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu
pExprs[i].base.resType = 0; pExprs[i].base.resType = 0;
int16_t type = 0; int16_t type = 0;
int16_t bytes = 0; int32_t bytes = 0;
// parse the arithmetic expression // parse the arithmetic expression
if (pExprs[i].base.functionId == TSDB_FUNC_SCALAR_EXPR) { if (pExprs[i].base.functionId == TSDB_FUNC_SCALAR_EXPR) {
......
...@@ -267,8 +267,8 @@ tMemBucket *tMemBucketCreate(uint16_t nElemSize, int16_t dataType, double minval ...@@ -267,8 +267,8 @@ tMemBucket *tMemBucketCreate(uint16_t nElemSize, int16_t dataType, double minval
tMemBucketDestroy(pBucket); tMemBucketDestroy(pBucket);
return NULL; return NULL;
} }
qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes); qDebug("MemBucket:%p, elem size:%" PRIu16, pBucket, pBucket->bytes);
return pBucket; return pBucket;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册