提交 75816b7e 编写于 作者: H Hongze Cheng

more work

上级 33838318
...@@ -192,8 +192,8 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); ...@@ -192,8 +192,8 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param); DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES* res); DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
// Shuduo: temporary enable for app build // Shuduo: temporary enable for app build
#if 1 #if 1
......
...@@ -346,19 +346,31 @@ _err: ...@@ -346,19 +346,31 @@ _err:
return code; return code;
} }
static FORCE_INLINE bool tsdbCommitIterEnd(SCommitter *pCommitter, STbDataIter *pIter) { #define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
return ((pRow == NULL) || (pRow->pTSRow->ts <= pCommitter->maxKey)); static int32_t tsdbMergeCommit(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
// TODO
return code;
} }
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx) { static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
int32_t code = 0; int32_t code = 0;
int32_t c; STbDataIter iter;
int32_t iBlock; STbDataIter *pIter = &iter;
int32_t nBlock; TSDBROW *pRow;
SBlock *pBlock; SBlockIdx blockIdx; // TODO
SBlock block;
SBlockIdx blockIdx; // todo // create iter
if (pTbData) {
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
} else {
pIter = NULL;
}
// check
pRow = tsdbTbDataIterGet(pIter);
if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
// start ================================ // start ================================
tMapDataReset(&pCommitter->oBlock); tMapDataReset(&pCommitter->oBlock);
...@@ -369,81 +381,46 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIte ...@@ -369,81 +381,46 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIte
} }
// impl =============================== // impl ===============================
iBlock = 0; SBlock block;
nBlock = pCommitter->oBlock.nItem; SBlock *pBlock = &block;
int32_t iBlock = 0;
if (iBlock < nBlock) { int32_t nBlock = pCommitter->oBlock.nItem;
pBlock = &block;
tMapDataGetItemByIdx(&pCommitter->nBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
while (true) {
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
bool iterEnd = ((pRow == NULL) || (pRow->pTSRow->ts > pCommitter->maxKey));
bool blockEnd = (pBlock == NULL);
if (iterEnd && blockEnd) break;
if (!iterEnd && !blockEnd) { // merge
c = tBlockCmprFn(&(SBlock){.maxKey.ts = pRow->pTSRow->ts}, pBlock); pRow = tsdbTbDataIterGet(pIter);
while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
code = tsdbMergeCommit(pCommitter, pIter, pBlock);
if (code) goto _err;
if (c == 0) { pRow = tsdbTbDataIterGet(pIter);
// merge until pBlock->maxKey iBlock++;
// if (pBlock->last), merge until pCommitter->maxKey
// else merge until pBlock->maxKey
} else if (c < 0) {
// tsdbCommitTableMemData(pIter, pBlock);
// if (pBlock->last), merge until pCommitter->maxKey
// else, commit until pBlock->minKey-1
} else {
// tsdbCommitTableDiskData(pBlock);
// if (pBlock->last), merge until pCommitter->maxKey
// else, move the block to new one
}
} else if (!iterEnd) {
// no block on disk, commit to last when there are no enough data
// commit memory data to pCommitter->maxKey
// tsdbCommitTableMemData(pIter, NULL);
} else {
// tsdbCommitTableDiskData(pBlock); // only left block
// if (last block ? ) else ?
// tMapDataPutItem(&pCommitter->nBlock, pBlock, tPutBlock);
iBlock++; // get new SBlock
}
} }
// end =============================== // mem
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlock, NULL, &blockIdx); pRow = tsdbTbDataIterGet(pIter);
if (code) goto _err; while (!ROW_END(pRow, pCommitter->maxKey)) {
code = tsdbMergeCommit(pCommitter, pIter, NULL);
code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx);
if (code) goto _err; if (code) goto _err;
return code; pRow = tsdbTbDataIterGet(pIter);
}
_err: // disk
tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); while (iBlock < nBlock) {
return code; tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
}
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { code = tsdbMergeCommit(pCommitter, NULL, pBlock);
int32_t code = 0; if (code) goto _err;
STbDataIter *pIter = NULL;
STbDataIter iter;
TSDBROW *pRow;
// create iter if can iBlock++;
if (pTbData) {
pIter = &iter;
tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
} }
// check // end ===============================
if (tsdbCommitIterEnd(pCommitter, pIter) && pBlockIdx == NULL) goto _exit; code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlock, NULL, &blockIdx);
if (code) goto _err;
// impl code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx);
code = tsdbCommitTableDataImpl(pCommitter, pIter, pBlockIdx);
if (code) goto _err; if (code) goto _err;
_exit: _exit:
......
...@@ -107,8 +107,8 @@ typedef struct SBlockLoadSuppInfo { ...@@ -107,8 +107,8 @@ typedef struct SBlockLoadSuppInfo {
struct STsdbReader { struct STsdbReader {
STsdb* pTsdb; STsdb* pTsdb;
uint64_t suid; uint64_t suid;
SQueryFilePos cur; // current position
int16_t order; int16_t order;
SQueryFilePos cur; // current position
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time // SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller // SColumnDataAgg** pstatis;// the ptr array list to return to caller
...@@ -292,138 +292,110 @@ struct STsdbReader { ...@@ -292,138 +292,110 @@ struct STsdbReader {
// } // }
// } // }
// static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReader* pReadHandle, TSKEY winSKey, SRetention* retentions) { static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId,
// if (VND_IS_RSMA(pVnode)) { STsdbReader** ppReader) {
// int level = 0; int32_t code = 0;
// int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); STsdbReader* pReader = NULL;
// for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { // alloc
// SRetention* pRetention = retentions + level; pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
// if (pRetention->keep <= 0) { if (pReader == NULL) {
// if (level > 0) { code = TSDB_CODE_OUT_OF_MEMORY;
// --level; goto _err;
// } }
// break; pReader->pTsdb = pVnode->pTsdb; // TODO: pass in pTsdb directly
// } pReader->suid = pCond->suid;
// if ((now - pRetention->keep) <= winSKey) { pReader->order = pCond->order;
// break; pReader->loadType = pCond->type;
// } pReader->loadExternalRow = pCond->loadExternalRows;
// ++level; pReader->currentLoadExternalRows = pCond->loadExternalRows;
// } pReader->type = TSDB_QUERY_TYPE_ALL;
pReader->cur.fid = INT32_MIN;
// if (level == TSDB_RETENTION_L0) { pReader->cur.win = TSWINDOW_INITIALIZER;
// tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, pReader->checkFiles = true;
// TSDB_RETENTION_L0); pReader->activeIndex = 0; // current active table index
// return VND_RSMA0(pVnode); pReader->allocSize = 0;
// } else if (level == TSDB_RETENTION_L1) { pReader->locateStart = false;
// tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, pReader->outputCapacity = 4096; //((STsdb*)tsdb)->config.maxRowsPerFileBlock;
// TSDB_RETENTION_L1);
// return VND_RSMA1(pVnode); // char buf[128] = {0};
// } else { // snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
// tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, // pReadHandle->idStr = strdup(buf);
// TSDB_RETENTION_L2);
// return VND_RSMA2(pVnode); // // if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
// } // // goto _end;
// } // // }
// return VND_TSDB(pVnode);
// }
// static STsdbReader* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
// STsdbReader* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReader));
// if (pReadHandle == NULL) {
// goto _end;
// }
// STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindows[0].skey,
// pVnode->config.tsdbCfg.retentions);
// pReadHandle->pTsdb = pTsdb;
// pReadHandle->suid = pCond->suid;
// pReadHandle->order = pCond->order;
// pReadHandle->loadType = pCond->type;
// pReadHandle->loadExternalRow = pCond->loadExternalRows;
// pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;
// pReadHandle->type = TSDB_QUERY_TYPE_ALL;
// pReadHandle->cur.fid = INT32_MIN;
// pReadHandle->cur.win = TSWINDOW_INITIALIZER;
// pReadHandle->checkFiles = true;
// pReadHandle->activeIndex = 0; // current active table index
// pReadHandle->allocSize = 0;
// pReadHandle->locateStart = false;
// pReadHandle->outputCapacity = 4096; //((STsdb*)tsdb)->config.maxRowsPerFileBlock; // setQueryTimewindow(pReadHandle, pCond, 0);
// char buf[128] = {0}; // if (pCond->numOfCols > 0) {
// snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId); // int32_t rowLen = 0;
// pReadHandle->idStr = strdup(buf); // for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// rowLen += pCond->colList[i].bytes;
// }
// // if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) { // // make sure the output SSDataBlock size be less than 2MB.
// // goto _end; // int32_t TWOMB = 2 * 1024 * 1024;
// // } // if (pReadHandle->outputCapacity * rowLen > TWOMB) {
// pReadHandle->outputCapacity = TWOMB / rowLen;
// }
// assert(pCond != NULL); // // allocate buffer in order to load data blocks from file
// setQueryTimewindow(pReadHandle, pCond, 0); // pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
// if (pReadHandle->suppInfo.pstatis == NULL) {
// goto _end;
// }
// if (pCond->numOfCols > 0) { // // todo: use list instead of array?
// int32_t rowLen = 0; // pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
// for (int32_t i = 0; i < pCond->numOfCols; ++i) { // if (pReadHandle->pColumns == NULL) {
// rowLen += pCond->colList[i].bytes; // goto _end;
// } // }
// // make sure the output SSDataBlock size be less than 2MB. // for (int32_t i = 0; i < pCond->numOfCols; ++i) {
// int32_t TWOMB = 2 * 1024 * 1024; // SColumnInfoData colInfo = {{0}, 0};
// if (pReadHandle->outputCapacity * rowLen > TWOMB) { // colInfo.info = pCond->colList[i];
// pReadHandle->outputCapacity = TWOMB / rowLen;
// }
// // allocate buffer in order to load data blocks from file // int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity);
// pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); // if (code != TSDB_CODE_SUCCESS) {
// if (pReadHandle->suppInfo.pstatis == NULL) { // goto _end;
// goto _end; // }
// }
// // todo: use list instead of array?
// pReadHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
// if (pReadHandle->pColumns == NULL) {
// goto _end;
// }
// for (int32_t i = 0; i < pCond->numOfCols; ++i) { // taosArrayPush(pReadHandle->pColumns, &colInfo);
// SColumnInfoData colInfo = {{0}, 0}; // }
// colInfo.info = pCond->colList[i];
// int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); // pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
// if (code != TSDB_CODE_SUCCESS) {
// goto _end;
// }
// taosArrayPush(pReadHandle->pColumns, &colInfo); // size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn);
// } // pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t));
// pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
// }
// pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); // pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
// if (pReadHandle->pDataCols == NULL) {
// tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _end;
// }
// size_t size = taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn); // tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo);
// pReadHandle->suppInfo.slotIds = taosMemoryCalloc(size, sizeof(int32_t)); // tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo);
// pReadHandle->suppInfo.plist = taosMemoryCalloc(size, POINTER_BYTES);
// }
// pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows); // return (STsdbReader*)pReadHandle;
// if (pReadHandle->pDataCols == NULL) {
// tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _end;
// }
// tsdbInitDataBlockLoadInfo(&pReadHandle->dataBlockLoadInfo); // _end:
// tsdbInitCompBlockLoadInfo(&pReadHandle->compBlockLoadInfo); // tsdbReaderClose(pReadHandle);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return NULL;
// return (STsdbReader*)pReadHandle; *ppReader = pReader;
return code;
// _end: _err:
// tsdbReaderClose(pReadHandle); // tsdbError("");
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; *ppReader = NULL;
// return NULL; return code;
// } }
// static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) { // static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) {
// STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0); // STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
...@@ -2659,10 +2631,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf ...@@ -2659,10 +2631,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf
uint64_t taskId, STsdbReader** ppReader) { uint64_t taskId, STsdbReader** ppReader) {
int32_t code = 0; int32_t code = 0;
// STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); code = tsdbReaderCreate(pVnode, pCond, qId, taskId, ppReader);
// if (pReader == NULL) { if (code) goto _err;
// return NULL;
// }
// if (emptyQueryTimewindow(pReader)) { // if (emptyQueryTimewindow(pReader)) {
// return (STsdbReader*)pReader; // return (STsdbReader*)pReader;
...@@ -2706,6 +2676,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf ...@@ -2706,6 +2676,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf
// taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr); // taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr);
return code; return code;
_err:
// tsdbError("");
return code;
} }
void tsdbReaderClose(STsdbReader* pReader) { void tsdbReaderClose(STsdbReader* pReader) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册