From 75816b7e7a73dfe84163e4a8bac0b60b0e399c5a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 16 Jun 2022 12:53:25 +0000 Subject: [PATCH] more work --- include/client/taos.h | 90 +++++----- source/dnode/vnode/src/tsdb/tsdbCommit.c | 125 ++++++------- source/dnode/vnode/src/tsdb/tsdbRead.c | 220 ++++++++++------------- 3 files changed, 193 insertions(+), 242 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index deb4276b54..479090b04d 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -126,47 +126,47 @@ typedef struct setConfRet { char retMsg[RET_MSG_LENGTH]; } setConfRet; -DLL_EXPORT void taos_cleanup(void); -DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); -DLL_EXPORT setConfRet taos_set_config(const char *config); -DLL_EXPORT int taos_init(void); -DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); -DLL_EXPORT void taos_close(TAOS *taos); - -const char *taos_data_type(int type); - -DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); -DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags); -DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags); -DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields); -DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields); - -DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); -DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); -DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); -DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); - -DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); - -DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); -DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result -DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS_RES *res); -DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT void taos_cleanup(void); +DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); +DLL_EXPORT setConfRet taos_set_config(const char *config); +DLL_EXPORT int taos_init(void); +DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); +DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); +DLL_EXPORT void taos_close(TAOS *taos); + +const char *taos_data_type(int type); + +DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); +DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags); +DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields); +DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields); + +DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); +DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); +DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); +DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); +DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); +DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); + +DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); + +DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); +DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result +DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT int taos_field_count(TAOS_RES *res); +DLL_EXPORT int taos_num_fields(TAOS_RES *res); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); @@ -181,8 +181,8 @@ DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnInde DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); -DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); -DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); +DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); +DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); DLL_EXPORT const char *taos_get_server_info(TAOS *taos); DLL_EXPORT const char *taos_get_client_info(); @@ -192,8 +192,8 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); -DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param); -DLL_EXPORT const void *taos_get_raw_block(TAOS_RES* res); +DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); +DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res); // Shuduo: temporary enable for app build #if 1 diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 4468fbe623..678cf34f9c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -346,19 +346,31 @@ _err: return code; } -static FORCE_INLINE bool tsdbCommitIterEnd(SCommitter *pCommitter, STbDataIter *pIter) { - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - return ((pRow == NULL) || (pRow->pTSRow->ts <= pCommitter->maxKey)); +#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey))) + +static int32_t tsdbMergeCommit(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { + int32_t code = 0; + // TODO + return code; } -static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx) { - int32_t code = 0; - int32_t c; - int32_t iBlock; - int32_t nBlock; - SBlock *pBlock; - SBlock block; - SBlockIdx blockIdx; // todo +static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { + int32_t code = 0; + STbDataIter iter; + STbDataIter *pIter = &iter; + TSDBROW *pRow; + SBlockIdx blockIdx; // TODO + + // create iter + if (pTbData) { + tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); + } else { + pIter = NULL; + } + + // check + pRow = tsdbTbDataIterGet(pIter); + if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit; // start ================================ tMapDataReset(&pCommitter->oBlock); @@ -369,48 +381,39 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIte } // impl =============================== - iBlock = 0; - nBlock = pCommitter->oBlock.nItem; + SBlock block; + SBlock *pBlock = █ + int32_t iBlock = 0; + int32_t nBlock = pCommitter->oBlock.nItem; - if (iBlock < nBlock) { - pBlock = █ - tMapDataGetItemByIdx(&pCommitter->nBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; + // merge + pRow = tsdbTbDataIterGet(pIter); + while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock); + code = tsdbMergeCommit(pCommitter, pIter, pBlock); + if (code) goto _err; + + pRow = tsdbTbDataIterGet(pIter); + iBlock++; } - while (true) { - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - bool iterEnd = ((pRow == NULL) || (pRow->pTSRow->ts > pCommitter->maxKey)); - bool blockEnd = (pBlock == NULL); - if (iterEnd && blockEnd) break; + // mem + pRow = tsdbTbDataIterGet(pIter); + while (!ROW_END(pRow, pCommitter->maxKey)) { + code = tsdbMergeCommit(pCommitter, pIter, NULL); + if (code) goto _err; - if (!iterEnd && !blockEnd) { - c = tBlockCmprFn(&(SBlock){.maxKey.ts = pRow->pTSRow->ts}, pBlock); + pRow = tsdbTbDataIterGet(pIter); + } - if (c == 0) { - // merge until pBlock->maxKey - // if (pBlock->last), merge until pCommitter->maxKey - // else merge until pBlock->maxKey - } else if (c < 0) { - // tsdbCommitTableMemData(pIter, pBlock); - // if (pBlock->last), merge until pCommitter->maxKey - // else, commit until pBlock->minKey-1 - } else { - // tsdbCommitTableDiskData(pBlock); - // if (pBlock->last), merge until pCommitter->maxKey - // else, move the block to new one - } - } else if (!iterEnd) { - // no block on disk, commit to last when there are no enough data - // commit memory data to pCommitter->maxKey - // tsdbCommitTableMemData(pIter, NULL); - } else { - // tsdbCommitTableDiskData(pBlock); // only left block - // if (last block ? ) else ? - // tMapDataPutItem(&pCommitter->nBlock, pBlock, tPutBlock); - iBlock++; // get new SBlock - } + // disk + while (iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock); + + code = tsdbMergeCommit(pCommitter, NULL, pBlock); + if (code) goto _err; + + iBlock++; } // end =============================== @@ -420,32 +423,6 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIte code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx); if (code) goto _err; - return code; - -_err: - tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { - int32_t code = 0; - STbDataIter *pIter = NULL; - STbDataIter iter; - TSDBROW *pRow; - - // create iter if can - if (pTbData) { - pIter = &iter; - tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); - } - - // check - if (tsdbCommitIterEnd(pCommitter, pIter) && pBlockIdx == NULL) goto _exit; - - // impl - code = tsdbCommitTableDataImpl(pCommitter, pIter, pBlockIdx); - if (code) goto _err; - _exit: pRow = tsdbTbDataIterGet(pIter); if (pRow) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c01c3abdab..91bf196c5a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -107,8 +107,8 @@ typedef struct SBlockLoadSuppInfo { struct STsdbReader { STsdb* pTsdb; uint64_t suid; - SQueryFilePos cur; // current position int16_t order; + SQueryFilePos cur; // current position STimeWindow window; // the primary query time window that applies to all queries // SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time // SColumnDataAgg** pstatis;// the ptr array list to return to caller @@ -292,138 +292,110 @@ struct STsdbReader { // } // } -// static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReader* pReadHandle, TSKEY winSKey, SRetention* retentions) { -// if (VND_IS_RSMA(pVnode)) { -// int level = 0; -// int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); - -// for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { -// SRetention* pRetention = retentions + level; -// if (pRetention->keep <= 0) { -// if (level > 0) { -// --level; -// } -// break; -// } -// if ((now - pRetention->keep) <= winSKey) { -// break; -// } -// ++level; -// } - -// if (level == TSDB_RETENTION_L0) { -// tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, -// TSDB_RETENTION_L0); -// return VND_RSMA0(pVnode); -// } else if (level == TSDB_RETENTION_L1) { -// tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, -// TSDB_RETENTION_L1); -// return VND_RSMA1(pVnode); -// } else { -// tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, -// TSDB_RETENTION_L2); -// return VND_RSMA2(pVnode); -// } -// } -// return VND_TSDB(pVnode); -// } - -// static STsdbReader* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) { -// STsdbReader* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReader)); -// if (pReadHandle == NULL) { -// goto _end; -// } - -// STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindows[0].skey, -// pVnode->config.tsdbCfg.retentions); - -// pReadHandle->pTsdb = pTsdb; -// pReadHandle->suid = pCond->suid; -// pReadHandle->order = pCond->order; -// pReadHandle->loadType = pCond->type; -// pReadHandle->loadExternalRow = pCond->loadExternalRows; -// pReadHandle->currentLoadExternalRows = pCond->loadExternalRows; -// pReadHandle->type = TSDB_QUERY_TYPE_ALL; -// pReadHandle->cur.fid = INT32_MIN; -// pReadHandle->cur.win = TSWINDOW_INITIALIZER; -// pReadHandle->checkFiles = true; -// pReadHandle->activeIndex = 0; // current active table index -// pReadHandle->allocSize = 0; -// pReadHandle->locateStart = false; - -// pReadHandle->outputCapacity = 4096; //((STsdb*)tsdb)->config.maxRowsPerFileBlock; - -// char buf[128] = {0}; -// snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId); -// pReadHandle->idStr = strdup(buf); +static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId, + STsdbReader** ppReader) { + int32_t code = 0; + STsdbReader* pReader = NULL; + + // alloc + pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pTsdb = pVnode->pTsdb; // TODO: pass in pTsdb directly + pReader->suid = pCond->suid; + pReader->order = pCond->order; + pReader->loadType = pCond->type; + pReader->loadExternalRow = pCond->loadExternalRows; + pReader->currentLoadExternalRows = pCond->loadExternalRows; + pReader->type = TSDB_QUERY_TYPE_ALL; + pReader->cur.fid = INT32_MIN; + pReader->cur.win = TSWINDOW_INITIALIZER; + pReader->checkFiles = true; + pReader->activeIndex = 0; // current active table index + pReader->allocSize = 0; + pReader->locateStart = false; + pReader->outputCapacity = 4096; //((STsdb*)tsdb)->config.maxRowsPerFileBlock; + + // char buf[128] = {0}; + // snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId); + // pReadHandle->idStr = strdup(buf); + + // // if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) { + // // goto _end; + // // } + + // setQueryTimewindow(pReadHandle, pCond, 0); + + // if (pCond->numOfCols > 0) { + // int32_t rowLen = 0; + // for (int32_t i = 0; i < pCond->numOfCols; ++i) { + // rowLen += pCond->colList[i].bytes; + // } -// // if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) { -// // goto _end; -// // } + // // make sure the output SSDataBlock size be less than 2MB. + // int32_t TWOMB = 2 * 1024 * 1024; + // if (pReadHandle->outputCapacity * rowLen > TWOMB) { + // pReadHandle->outputCapacity = TWOMB / rowLen; + // } -// assert(pCond != NULL); -// setQueryTimewindow(pReadHandle, pCond, 0); + // // allocate buffer in order to load data blocks from file + // pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); + // if (pReadHandle->suppInfo.pstatis == NULL) { + // goto _end; + // } -// if (pCond->numOfCols > 0) { -// int32_t rowLen = 0; -// for (int32_t i = 0; i < pCond->numOfCols; ++i) { -// rowLen += pCond->colList[i].bytes; -// } + // // todo: use list instead of array? + // pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); + // if (pReadHandle->pColumns == NULL) { + // goto _end; + // } -// // make sure the output SSDataBlock size be less than 2MB. -// int32_t TWOMB = 2 * 1024 * 1024; -// if (pReadHandle->outputCapacity * rowLen > TWOMB) { -// pReadHandle->outputCapacity = TWOMB / rowLen; -// } + // for (int32_t i = 0; i < pCond->numOfCols; ++i) { + // SColumnInfoData colInfo = {{0}, 0}; + // colInfo.info = pCond->colList[i]; -// // allocate buffer in order to load data blocks from file -// pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); -// if (pReadHandle->suppInfo.pstatis == NULL) { -// goto _end; -// } - -// // todo: use list instead of array? -// pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); -// if (pReadHandle->pColumns == NULL) { -// goto _end; -// } + // int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); + // if (code != TSDB_CODE_SUCCESS) { + // goto _end; + // } -// for (int32_t i = 0; i < pCond->numOfCols; ++i) { -// SColumnInfoData colInfo = {{0}, 0}; -// colInfo.info = pCond->colList[i]; + // taosArrayPush(pReadHandle->pColumns, &colInfo); + // } -// int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); -// if (code != TSDB_CODE_SUCCESS) { -// goto _end; -// } + // pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); -// taosArrayPush(pReadHandle->pColumns, &colInfo); -// } + // size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn); + // pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t)); + // pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES); + // } -// pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); + // pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows); + // if (pReadHandle->pDataCols == NULL) { + // tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); + // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + // goto _end; + // } -// size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn); -// pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t)); -// pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES); -// } + // tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo); + // tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo); -// pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows); -// if (pReadHandle->pDataCols == NULL) { -// tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); -// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; -// goto _end; -// } + // return (STsdbReader*)pReadHandle; -// tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo); -// tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo); + // _end: + // tsdbReaderClose(pReadHandle); + // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + // return NULL; -// return (STsdbReader*)pReadHandle; + *ppReader = pReader; + return code; -// _end: -// tsdbReaderClose(pReadHandle); -// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; -// return NULL; -// } +_err: + // tsdbError(""); + *ppReader = NULL; + return code; +} // static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) { // STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0); @@ -2659,10 +2631,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf uint64_t taskId, STsdbReader** ppReader) { int32_t code = 0; - // STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); - // if (pReader == NULL) { - // return NULL; - // } + code = tsdbReaderCreate(pVnode, pCond, qId, taskId, ppReader); + if (code) goto _err; // if (emptyQueryTimewindow(pReader)) { // return (STsdbReader*)pReader; @@ -2706,6 +2676,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf // taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr); return code; + +_err: + // tsdbError(""); + return code; } void tsdbReaderClose(STsdbReader* pReader) { -- GitLab