提交 8a0922e8 编写于 作者: G gccgdb1234

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -422,6 +422,8 @@ typedef struct {
STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input]
tb_uid_t suid;
tb_uid_t uid;
int32_t nFileSet;
int32_t iFileSet;
SArray *aDFileSet;
......@@ -593,6 +595,9 @@ typedef struct SFSNextRowIter {
SFSNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input]
SBlockIdx *pBlockIdxExp; // [input]
STSchema *pTSchema; // [input]
tb_uid_t suid;
tb_uid_t uid;
int32_t nFileSet;
int32_t iFileSet;
SArray *aDFileSet;
......@@ -685,6 +690,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock);
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
tBlockDataReset(state->pBlockData);
code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema);
if (code) goto _err;
code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData);
if (code) goto _err;
......@@ -958,16 +967,21 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES) SFSNEXTROW_FS;
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
pIter->fsLastState.pBlockIdxExp = &pIter->idx;
pIter->fsLastState.pTSchema = pTSchema;
pIter->fsLastState.suid = suid;
pIter->fsLastState.uid = uid;
pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb;
pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
pIter->fsState.pBlockIdxExp = &pIter->idx;
pIter->fsState.pTSchema = pTSchema;
pIter->fsState.suid = suid;
pIter->fsState.uid = uid;
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
......
......@@ -60,8 +60,7 @@ typedef enum {
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_ASYNC_LAUNCH_TASK 0
#define SCH_MIN_AYSNC_EXEC_NUM 3
typedef struct SSchDebug {
bool lockEnable;
......
......@@ -871,14 +871,14 @@ _return:
taosMemoryFree(param);
#if SCH_ASYNC_LAUNCH_TASK
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code);
}
if (code) {
code = schHandleJobFailure(pJob, code);
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
if (code) {
code = schProcessOnTaskFailure(pJob, pTask, code);
}
if (code) {
code = schHandleJobFailure(pJob, code);
}
}
#endif
SCH_RET(code);
}
......@@ -893,12 +893,12 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
param->pJob = pJob;
param->pTask = pTask;
#if SCH_ASYNC_LAUNCH_TASK
taosAsyncExec(schLaunchTaskImpl, param, NULL);
#else
SCH_ERR_RET(schLaunchTaskImpl(param));
#endif
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
taosAsyncExec(schLaunchTaskImpl, param, NULL);
} else {
SCH_ERR_RET(schLaunchTaskImpl(param));
}
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册