提交 945de54b 编写于 作者: H Haojun Liao

[td-14493] add new api.

上级 c84afedc
......@@ -188,6 +188,8 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData);
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
......
......@@ -385,11 +385,20 @@ bool taos_is_update_query(TAOS_RES *res) {
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
int32_t numOfRows = 0;
/*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows);
return numOfRows;
}
int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest == NULL) {
return 0;
}
(*rows) = NULL;
(*numOfRows) = 0;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
......@@ -400,9 +409,32 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
// TODO refactor
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
*rows = pResultInfo->row;
return pResultInfo->numOfRows;
(*rows) = pResultInfo->row;
(*numOfRows) = pResultInfo->numOfRows;
return pRequest->code;
}
int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest == NULL) {
return 0;
}
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
}
doFetchRow(pRequest, false);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void*) pResultInfo->pData;
return 0;
}
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
......
......@@ -641,6 +641,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
uint64_t* total, SArray* pColList);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity);
SSDataBlock* loadNextDataBlock(void* param);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
......@@ -665,8 +667,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
......
......@@ -4737,8 +4737,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
pBlock->info.rows += 1;
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol,
int32_t capacity) {
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
blockDataCleanup(pDataBlock);
while (1) {
......@@ -4759,7 +4758,6 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB
SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
bool newgroup = false;
return pOperator->getNextFn(pOperator, &newgroup);
}
......@@ -4939,7 +4937,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortedMergeOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity);
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity);
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
......@@ -5084,15 +5082,14 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info;
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
pInfo->pDataBlock, "GET_TASKID(pTaskInfo)");
pInfo->pDataBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
......@@ -5106,38 +5103,40 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
}
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo,
SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
goto _error;
}
pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResBlock;
pInfo->pSortInfo = pSortInfo;
pOperator->name = "Sort";
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; }
......
......@@ -345,45 +345,73 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info;
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
pInfo->pDataBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource));
ps->param = pOperator->pDownstream[0];
tsortAddSource(pInfo->pSortHandle, ps);
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pOperator->name = "PartitionByOperator";
pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResultBlock;
pInfo->pSortInfo = pSortInfo;
pOperator->name = "PartitionOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doPartitionData;
// pOperator->closeFn = destroyOrderOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册