提交 f8bde073 编写于 作者: X Xiaoyu Wang

TD-13037 datasink code

上级 0a9675e4
......@@ -113,7 +113,7 @@ typedef struct SDataBlockDescNode {
typedef struct SPhysiNode {
ENodeType type;
SDataBlockDescNode outputDataBlockDesc;
SDataBlockDescNode* pOutputDataBlockDesc;
SNode* pConditions;
SNodeList* pChildren;
struct SPhysiNode* pParent;
......@@ -175,7 +175,7 @@ typedef struct SExchangePhysiNode {
typedef struct SDataSinkNode {
ENodeType type;;
SDataBlockDescNode inputDataBlockDesc;
SDataBlockDescNode* pInputDataBlockDesc;
} SDataSinkNode;
typedef struct SDataDispatcherNode {
......
......@@ -39,7 +39,7 @@ typedef struct SDataCacheEntry {
typedef struct SDataDispatchHandle {
SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataBlockDescNode schema;
SDataBlockDescNode* pSchema;
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t status;
......@@ -109,14 +109,14 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
// data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0;
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows;
pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows;
}
pBuf->useSize += pEntry->dataLen;
// todo completed
......@@ -130,7 +130,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->pSchema->resultRowSize * pInput->pData->info.rows;
pBuf->pData = malloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
......@@ -196,7 +196,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
if (NULL == pDispatcher->nextOutput.pData) {
assert(pDispatcher->queryEnd);
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
pOutput->precision = pDispatcher->pSchema->precision;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
......@@ -208,7 +208,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pthread_mutex_lock(&pDispatcher->mutex);
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
pOutput->precision = pDispatcher->pSchema->precision;
pthread_mutex_unlock(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
}
......@@ -238,7 +238,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager;
dispatcher->schema = pDataSink->inputDataBlockDesc;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
dispatcher->pDataBlocks = taosOpenQueue();
......
......@@ -165,6 +165,23 @@ static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
return (SNode*)pDst;
}
static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) {
COPY_SCALAR_FIELD(dataBlockId);
COPY_NODE_LIST_FIELD(pSlots);
COPY_SCALAR_FIELD(resultRowSize);
COPY_SCALAR_FIELD(precision);
return (SNode*)pDst;
}
static SNode* slotDescCopy(const SSlotDescNode* pSrc, SSlotDescNode* pDst) {
COPY_SCALAR_FIELD(slotId);
dataTypeCopy(&pSrc->dataType, &pDst->dataType);
COPY_SCALAR_FIELD(reserve);
COPY_SCALAR_FIELD(output);
COPY_SCALAR_FIELD(tag);
return (SNode*)pDst;
}
SNodeptr nodesCloneNode(const SNodeptr pNode) {
if (NULL == pNode) {
return NULL;
......@@ -196,8 +213,12 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case QUERY_NODE_ORDER_BY_EXPR:
case QUERY_NODE_LIMIT:
break;
case QUERY_NODE_DATABLOCK_DESC:
return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst);
case QUERY_NODE_SLOT_DESC:
return slotDescCopy((const SSlotDescNode*)pNode, (SSlotDescNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
default:
break;
}
......
......@@ -262,7 +262,7 @@ static const char* jkPhysiPlanChildren = "Children";
static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) {
const SPhysiNode* pNode = (const SPhysiNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputDataBlockDesc, nodeToJson, &pNode->outputDataBlockDesc);
int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputDataBlockDesc, nodeToJson, pNode->pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions);
}
......@@ -276,7 +276,7 @@ static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToPhysicPlanNode(const SJson* pJson, void* pObj) {
SPhysiNode* pNode = (SPhysiNode*)pObj;
int32_t code = tjsonToObject(pJson, jkPhysiPlanOutputDataBlockDesc, jsonToNode, &pNode->outputDataBlockDesc);
int32_t code = jsonToNodeObject(pJson, jkPhysiPlanOutputDataBlockDesc, (SNode**)&pNode->pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkPhysiPlanConditions, &pNode->pConditions);
}
......@@ -494,6 +494,26 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
const SDataSinkNode* pNode = (const SDataSinkNode*)pObj;
return tjsonAddObject(pJson, jkDataSinkInputDataBlockDesc, nodeToJson, pNode->pInputDataBlockDesc);
}
static int32_t jsonToPhysicDataSinkNode(const SJson* pJson, void* pObj) {
SDataSinkNode* pNode = (SDataSinkNode*)pObj;
return jsonToNodeObject(pJson, jkDataSinkInputDataBlockDesc, (SNode**)&pNode->pInputDataBlockDesc);
}
static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) {
return physicDataSinkNodeToJson(pObj, pJson);
}
static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) {
return jsonToPhysicDataSinkNode(pJson, pObj);
}
static const char* jkSubplanIdQueryId = "QueryId";
static const char* jkSubplanIdTemplateId = "TemplateId";
static const char* jkSubplanIdSubplanId = "SubplanId";
......@@ -861,41 +881,43 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkValueDuration, pNode->isDuration);
}
switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
code = tjsonAddIntegerToObject(pJson, jkValueDuration, pNode->datum.b);
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
code = tjsonAddIntegerToObject(pJson, jkValueDuration, pNode->datum.i);
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
code = tjsonAddIntegerToObject(pJson, jkValueDuration, pNode->datum.u);
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonAddDoubleToObject(pJson, jkValueDuration, pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
code = tjsonAddStringToObject(pJson, jkValueLiteral, pNode->datum.p);
break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
// todo
default:
break;
if (TSDB_CODE_SUCCESS == code) {
switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
code = tjsonAddIntegerToObject(pJson, jkValueDatum, pNode->datum.b);
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
code = tjsonAddIntegerToObject(pJson, jkValueDatum, pNode->datum.i);
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
code = tjsonAddIntegerToObject(pJson, jkValueDatum, pNode->datum.u);
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
code = tjsonAddStringToObject(pJson, jkValueDatum, pNode->datum.p);
break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
// todo
default:
break;
}
}
return code;
......@@ -911,41 +933,43 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkValueDuration, &pNode->isDuration);
}
switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
code = tjsonGetBoolValue(pJson, jkValueDuration, &pNode->datum.b);
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
code = tjsonGetBigIntValue(pJson, jkValueDuration, &pNode->datum.i);
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
code = tjsonGetUBigIntValue(pJson, jkValueDuration, &pNode->datum.u);
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonGetDoubleValue(pJson, jkValueDuration, &pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
code = tjsonDupStringValue(pJson, jkValueLiteral, &pNode->datum.p);
break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
// todo
default:
break;
if (TSDB_CODE_SUCCESS == code) {
switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
code = tjsonGetBoolValue(pJson, jkValueDatum, &pNode->datum.b);
break;
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
code = tjsonGetBigIntValue(pJson, jkValueDatum, &pNode->datum.i);
break;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
code = tjsonGetUBigIntValue(pJson, jkValueDatum, &pNode->datum.u);
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
code = tjsonGetDoubleValue(pJson, jkValueDatum, &pNode->datum.d);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
code = tjsonDupStringValue(pJson, jkValueDatum, &pNode->datum.p);
break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
// todo
default:
break;
}
}
return code;
......@@ -1328,6 +1352,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
break;
case QUERY_NODE_PHYSICAL_SUBPLAN:
......@@ -1397,6 +1422,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiJoinNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN:
return jsonToSubplan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN:
......@@ -1404,6 +1431,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
default:
break;
}
printf("================================ jsonToSpecificNode unknown node = %s\n", nodesNodeName(nodeType(pObj)));
return TSDB_CODE_SUCCESS;
}
......@@ -1432,6 +1460,9 @@ static int32_t jsonToNode(const SJson* pJson, void* pObj) {
pNode->type = val;
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, nodesNodeName(pNode->type), jsonToSpecificNode, pNode);
if (TSDB_CODE_SUCCESS != code) {
printf("%s toNode error\n", nodesNodeName(pNode->type));
}
}
return code;
......@@ -1454,7 +1485,7 @@ static int32_t makeNodeByJson(const SJson* pJson, SNode** pNode) {
static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** pNode) {
SJson* pJsonNode = tjsonGetObjectItem(pJson, pName);
if (NULL == pJsonNode) {
return TSDB_CODE_FAILED;
return TSDB_CODE_SUCCESS;
}
return makeNodeByJson(pJsonNode, pNode);
}
......
......@@ -131,6 +131,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
default:
break;
}
printf("================================ nodesMakeNode unknown node = %s\n", nodesNodeName(type));
return NULL;
}
......
......@@ -153,17 +153,20 @@ static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) {
SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
if (NULL == pPhysiNode) {
CHECK_ALLOC(pPhysiNode, NULL);
pPhysiNode->pOutputDataBlockDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
if (NULL == pPhysiNode->pOutputDataBlockDesc) {
nodesDestroyNode(pPhysiNode);
return NULL;
}
pPhysiNode->outputDataBlockDesc.dataBlockId = pCxt->nextDataBlockId++;
pPhysiNode->outputDataBlockDesc.type = QUERY_NODE_DATABLOCK_DESC;
pPhysiNode->pOutputDataBlockDesc->dataBlockId = pCxt->nextDataBlockId++;
pPhysiNode->pOutputDataBlockDesc->type = QUERY_NODE_DATABLOCK_DESC;
return pPhysiNode;
}
static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
if (NULL != pLogicNode->pConditions) {
pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputDataBlockDesc.dataBlockId, -1, pLogicNode->pConditions);
pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions);
CHECK_ALLOC(pPhysiNode->pConditions, TSDB_CODE_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
......@@ -188,11 +191,11 @@ static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanL
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
}
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, pScanPhysiNode->node.pOutputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
......@@ -276,18 +279,18 @@ static SPhysiNode* createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN);
CHECK_ALLOC(pJoin, NULL);
SDataBlockDescNode* pLeftDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc;
SDataBlockDescNode* pRightDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputDataBlockDesc;
SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions);
CHECK_ALLOC(pJoin->pOnConditions, (SPhysiNode*)pJoin);
pJoin->pTargets = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc);
CHECK_ALLOC(pJoin->pTargets, (SPhysiNode*)pJoin);
CHECK_CODE(addDataBlockDesc(pCxt, pJoin->pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin);
CHECK_CODE(addDataBlockDesc(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc), (SPhysiNode*)pJoin);
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin), (SPhysiNode*)pJoin);
CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin);
CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, pJoin->node.pOutputDataBlockDesc), (SPhysiNode*)pJoin);
return (SPhysiNode*)pJoin;
}
......@@ -385,8 +388,8 @@ static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys), (SPhysiNode*)pAgg);
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs), (SPhysiNode*)pAgg);
SDataBlockDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc);
// push down expression to outputDataBlockDesc of child node
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (NULL != pPrecalcExprs) {
pAgg->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs);
CHECK_ALLOC(pAgg->pExprs, (SPhysiNode*)pAgg);
......@@ -396,18 +399,18 @@ static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
if (NULL != pGroupKeys) {
pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys);
CHECK_ALLOC(pAgg->pGroupKeys, (SPhysiNode*)pAgg);
CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc), (SPhysiNode*)pAgg);
}
if (NULL != pAggFuncs) {
pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs);
CHECK_ALLOC(pAgg->pAggFuncs, (SPhysiNode*)pAgg);
CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc), (SPhysiNode*)pAgg);
}
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg), (SPhysiNode*)pAgg);
CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, pAgg->node.pOutputDataBlockDesc), (SPhysiNode*)pAgg);
return (SPhysiNode*)pAgg;
}
......@@ -416,9 +419,9 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
CHECK_ALLOC(pProject, NULL);
pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc.dataBlockId, -1, pProjectLogicNode->pProjections);
pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1, pProjectLogicNode->pProjections);
CHECK_ALLOC(pProject->pProjections, (SPhysiNode*)pProject);
CHECK_CODE(addDataBlockDesc(pCxt, pProject->pProjections, &pProject->node.outputDataBlockDesc), (SPhysiNode*)pProject);
CHECK_CODE(addDataBlockDesc(pCxt, pProject->pProjections, pProject->node.pOutputDataBlockDesc), (SPhysiNode*)pProject);
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject), (SPhysiNode*)pProject);
......@@ -468,12 +471,21 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
static SDataSinkNode* createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks) {
SDataInserterNode* pInserter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
CHECK_ALLOC(pInserter, NULL);
pInserter->numOfTables = pBlocks->numOfTables;
pInserter->size = pBlocks->size;
TSWAP(pInserter->pData, pBlocks->pData, char*);
return (SDataSinkNode*)pInserter;
}
static SDataSinkNode* createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot) {
SDataDispatcherNode* pDispatcher = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
CHECK_ALLOC(pDispatcher, NULL);
pDispatcher->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
CHECK_ALLOC(pDispatcher->sink.pInputDataBlockDesc, (SDataSinkNode*)pDispatcher);
return (SDataSinkNode*)pDispatcher;
}
static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) {
SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
CHECK_ALLOC(pSubplan, NULL);
......@@ -483,7 +495,7 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
pSubplan->msgType = pModif->msgType;
} else {
pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode);
// pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
}
pSubplan->subplanType = pLogicSubplan->subplanType;
return pSubplan;
......
......@@ -109,6 +109,14 @@ private:
cout << "sql:[" << cxt_.pSql << "] toString code:" << code << ", strerror:" << tstrerror(code) << endl;
return string();
}
SNode* pNode;
code = nodesStringToNode(pStr, &pNode);
if (code != TSDB_CODE_SUCCESS) {
tfree(pStr);
cout << "sql:[" << cxt_.pSql << "] toObject code:" << code << ", strerror:" << tstrerror(code) << endl;
return string();
}
nodesDestroyNode(pNode);
string str(pStr);
tfree(pStr);
return str;
......@@ -151,8 +159,3 @@ TEST_F(PlannerTest, subquery) {
bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, createTable) {
bind("create table t1(ts timestamp, c1 int)");
ASSERT_TRUE(run());
}
......@@ -98,7 +98,6 @@ int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void
SJson* pJobj = tjsonCreateObject();
if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) {
printf("%s:%d code = %d\n", __FUNCTION__, __LINE__, TSDB_CODE_FAILED);
tjsonDelete(pJobj);
return TSDB_CODE_FAILED;
}
......@@ -159,9 +158,8 @@ int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal
if (NULL == p) {
return TSDB_CODE_FAILED;
}
char* pEnd = NULL;
*pVal = strtol(p, &pEnd, 10);
return (NULL == pEnd ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED);
*pVal = strtol(p, NULL, 10);
return TSDB_CODE_SUCCESS;
}
int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal) {
......@@ -190,9 +188,8 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV
if (NULL == p) {
return TSDB_CODE_FAILED;
}
char* pEnd = NULL;
*pVal = strtoul(p, &pEnd, 10);
return (NULL == pEnd ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED);
*pVal = strtoul(p, NULL, 10);
return TSDB_CODE_SUCCESS;
}
int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) {
......@@ -204,7 +201,7 @@ int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pV
int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal) {
const SJson* pObject = tjsonGetObjectItem(pJson, pName);
if (cJSON_IsBool(pObject)) {
if (!cJSON_IsBool(pObject)) {
return TSDB_CODE_FAILED;
}
*pVal = cJSON_IsTrue(pObject) ? true : false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册