提交 97e7a41a 编写于 作者: D dapan1121

feature/qnode

上级 cda69861
...@@ -101,6 +101,7 @@ size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSi ...@@ -101,6 +101,7 @@ size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSi
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol); void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
void *blockDataDestroy(SSDataBlock *pBlock); void *blockDataDestroy(SSDataBlock *pBlock);
......
...@@ -120,14 +120,18 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con ...@@ -120,14 +120,18 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
} else { } else {
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
switch(type) { switch(type) {
case TSDB_DATA_TYPE_BOOL: {*(bool*) p = *(bool*) pData;break;}
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;} case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;}
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;} case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;}
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;} case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;} case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;}
case TSDB_DATA_TYPE_FLOAT: {*(float*) p = *(float*) pData;break;}
case TSDB_DATA_TYPE_DOUBLE: {*(double*) p = *(double*) pData;break;}
default: default:
assert(0); assert(0);
} }
...@@ -1040,36 +1044,47 @@ void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) { ...@@ -1040,36 +1044,47 @@ void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) {
} }
} }
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); char* tmp = realloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
if (IS_VAR_DATA_TYPE(p->info.type)) { if (tmp == NULL) {
char* tmp = realloc(p->varmeta.offset, sizeof(int32_t) * numOfRows); return TSDB_CODE_OUT_OF_MEMORY;
if (tmp == NULL) { }
return TSDB_CODE_OUT_OF_MEMORY;
}
p->varmeta.offset = (int32_t*)tmp; pColumn->varmeta.offset = (int32_t*)tmp;
memset(p->varmeta.offset, 0, sizeof(int32_t) * numOfRows); memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
p->varmeta.length = 0; pColumn->varmeta.length = 0;
p->varmeta.allocLen = 0; pColumn->varmeta.allocLen = 0;
tfree(p->pData); tfree(pColumn->pData);
} else { } else {
char* tmp = realloc(p->nullbitmap, BitmapLen(numOfRows)); char* tmp = realloc(pColumn->nullbitmap, BitmapLen(numOfRows));
if (tmp == NULL) { if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
p->nullbitmap = tmp; pColumn->nullbitmap = tmp;
memset(p->nullbitmap, 0, BitmapLen(numOfRows)); memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
tmp = realloc(p->pData, numOfRows * p->info.bytes); tmp = realloc(pColumn->pData, numOfRows * pColumn->info.bytes);
if (tmp == NULL) { if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
p->pData = tmp; pColumn->pData = tmp;
}
return TSDB_CODE_SUCCESS;
}
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0;
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
code = blockDataEnsureColumnCapacity(p, numOfRows);
if (code) {
return code;
} }
} }
...@@ -1097,4 +1112,4 @@ void* blockDataDestroy(SSDataBlock* pBlock) { ...@@ -1097,4 +1112,4 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
tfree(pBlock->pBlockAgg); tfree(pBlock->pBlockAgg);
tfree(pBlock); tfree(pBlock);
return NULL; return NULL;
} }
\ No newline at end of file
...@@ -105,6 +105,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker ...@@ -105,6 +105,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case QUERY_NODE_RAW_EXPR: case QUERY_NODE_RAW_EXPR:
res = walkNode(((SRawExprNode*)pNode)->pNode, order, walker, pContext); res = walkNode(((SRawExprNode*)pNode)->pNode, order, walker, pContext);
break; break;
case QUERY_NODE_TARGET:
res = walkNode(((STargetNode*)pNode)->pExpr, order, walker, pContext);
break;
default: default:
break; break;
} }
......
...@@ -22,6 +22,9 @@ extern "C" { ...@@ -22,6 +22,9 @@ extern "C" {
#include "sclfunc.h" #include "sclfunc.h"
typedef double (*_mathFunc)(double, double, bool *);
typedef void (*_bufConverteFunc)(char *buf, SScalarParam* pOut, int32_t outType); typedef void (*_bufConverteFunc)(char *buf, SScalarParam* pOut, int32_t outType);
typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *output, int32_t order); typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *output, int32_t order);
_bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator); _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binOperator);
......
...@@ -142,9 +142,13 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t ...@@ -142,9 +142,13 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
case QUERY_NODE_VALUE: { case QUERY_NODE_VALUE: {
SValueNode *valueNode = (SValueNode *)node; SValueNode *valueNode = (SValueNode *)node;
param->data = nodesGetValueFromNode(valueNode); param->data = nodesGetValueFromNode(valueNode);
param->orig.data = param->data;
param->num = 1; param->num = 1;
param->type = valueNode->node.resType.type; param->type = valueNode->node.resType.type;
param->bytes = valueNode->node.resType.bytes; param->bytes = valueNode->node.resType.bytes;
if (TSDB_DATA_TYPE_NULL == param->type) {
sclSetNull(param, 0);
}
param->dataInBlock = false; param->dataInBlock = false;
break; break;
...@@ -178,12 +182,12 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t ...@@ -178,12 +182,12 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
} }
SColumnNode *ref = (SColumnNode *)node; SColumnNode *ref = (SColumnNode *)node;
if (ref->tupleId >= taosArrayGetSize(ctx->pBlockList)) { if (ref->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->tupleId, (int32_t)taosArrayGetSize(ctx->pBlockList)); sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, ref->tupleId); SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, ref->dataBlockId);
if (NULL == block || ref->slotId >= taosArrayGetSize(block->pDataBlock)) { if (NULL == block || ref->slotId >= taosArrayGetSize(block->pDataBlock)) {
sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock)); sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
...@@ -639,15 +643,15 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) { ...@@ -639,15 +643,15 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) { EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
STargetNode *target = (STargetNode *)pNode; STargetNode *target = (STargetNode *)pNode;
if (target->tupleId >= taosArrayGetSize(ctx->pBlockList)) { if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->tupleId, (int32_t)taosArrayGetSize(ctx->pBlockList)); sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
ctx->code = TSDB_CODE_QRY_INVALID_INPUT; ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
SSDataBlock *block = taosArrayGet(ctx->pBlockList, target->tupleId); SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, target->dataBlockId);
if (target->slotId >= taosArrayGetSize(block->pDataBlock)) { if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
sclError("target slot not exist, slotId:%d, dataBlockNum:%d", target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock)); sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
ctx->code = TSDB_CODE_QRY_INVALID_INPUT; ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
...@@ -734,7 +738,7 @@ _return: ...@@ -734,7 +738,7 @@ _return:
} }
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
if (NULL == pNode || NULL == pBlockList || NULL == pDst) { if (NULL == pNode || NULL == pBlockList) {
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -751,15 +755,17 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { ...@@ -751,15 +755,17 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
SCL_ERR_JRET(ctx.code); SCL_ERR_JRET(ctx.code);
SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); if (pDst) {
if (NULL == res) { SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode)); if (NULL == res) {
SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
*pDst = *res;
} }
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
*pDst = *res;
_return: _return:
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册