提交 dced0dea 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/retention

...@@ -810,11 +810,16 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -810,11 +810,16 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param; SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code; pRequest->code = code;
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
if (pResult) {
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
}
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) { TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows; if (pResult) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
}
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
} }
...@@ -1476,12 +1481,16 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU ...@@ -1476,12 +1481,16 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
tsem_wait(&pParam->sem); tsem_wait(&pParam->sem);
} }
if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) { if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
doSetOneRowPtr(pResultInfo); return NULL;
pResultInfo->current += 1; } else {
} if (setupOneRowPtr) {
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
}
return pResultInfo->row; return pResultInfo->row;
}
} }
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
......
...@@ -1747,7 +1747,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1747,7 +1747,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
for (int32_t k = 0; k < colNum; k++) { for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL) || !var) { if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL"); len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
if (len >= size -1) return dumpBuf; if (len >= size -1) return dumpBuf;
continue; continue;
......
...@@ -779,6 +779,8 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo ...@@ -779,6 +779,8 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList); SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
......
...@@ -869,7 +869,7 @@ static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) { ...@@ -869,7 +869,7 @@ static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
return w; return w;
} }
static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) { STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
int32_t factor = (order == TSDB_ORDER_ASC)? -1:1; int32_t factor = (order == TSDB_ORDER_ASC)? -1:1;
STimeWindow win = *pWindow; STimeWindow win = *pWindow;
......
...@@ -545,9 +545,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct ...@@ -545,9 +545,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
if (pCtx[k].fpSet.process == NULL) { if (pCtx[k].fpSet.process == NULL) {
continue; continue;
} }
#ifdef BUF_PAGE_DEBUG
qDebug("page_process");
#endif
int32_t code = pCtx[k].fpSet.process(&pCtx[k]); int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
...@@ -578,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -578,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t numOfRows = 0; int32_t numOfRows = 0;
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
int32_t outputSlotId = pExpr[k].base.resSchema.slotId; int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
SqlFunctionCtx* pfCtx = &pCtx[k]; SqlFunctionCtx* pfCtx = &pCtx[k];
SInputColumnInfoData* pInputData = &pfCtx->input; SInputColumnInfoData* pInputData = &pfCtx->input;
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pResult->info.rows > 0 && !createNewColModel) { if (pResult->info.rows > 0 && !createNewColModel) {
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows); colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
pInputData->numOfRows);
} else { } else {
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
} }
...@@ -643,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -643,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
} else if (fmIsAggFunc(pfCtx->functionId)) { } else if (fmIsAggFunc(pfCtx->functionId)) {
// _group_key function for "partition by tbname" + csum(col_name) query // _group_key function for "partition by tbname" + csum(col_name) query
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
int32_t slotId = pfCtx->param[0].pCol->slotId; int32_t slotId = pfCtx->param[0].pCol->slotId;
// todo handle the json tag // todo handle the json tag
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) { for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
bool isNull = colDataIsNull_s(pInput, f); bool isNull = colDataIsNull_s(pInput, f);
if (isNull) { if (isNull) {
colDataAppendNULL(pOutput, pResult->info.rows + f); colDataAppendNULL(pOutput, pResult->info.rows + f);
...@@ -2443,7 +2442,6 @@ _error: ...@@ -2443,7 +2442,6 @@ _error:
doDestroyExchangeOperatorInfo(pInfo); doDestroyExchangeOperatorInfo(pInfo);
} }
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -3393,6 +3391,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -3393,6 +3391,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
assert(pBlock != NULL); assert(pBlock != NULL);
} }
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);
if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block
pInfo->existNewGroupBlock = pBlock; pInfo->existNewGroupBlock = pBlock;
*newgroup = false; *newgroup = false;
...@@ -3821,7 +3821,8 @@ _error: ...@@ -3821,7 +3821,8 @@ _error:
return NULL; return NULL;
} }
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) { static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
SExecTaskInfo* pTaskInfo) {
int32_t order = 0; int32_t order = 0;
int32_t scanFlag = 0; int32_t scanFlag = 0;
...@@ -3876,9 +3877,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { ...@@ -3876,9 +3877,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while(1) { while (1) {
// here we need to handle the existsed group results // here we need to handle the existsed group results
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
for (int32_t k = 0; k < pSup->numOfExprs; ++k) { for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
SqlFunctionCtx* pCtx = &pSup->pCtx[k]; SqlFunctionCtx* pCtx = &pSup->pCtx[k];
...@@ -3976,15 +3977,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -3976,15 +3977,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pCondition = pPhyNode->node.pConditions; pInfo->pCondition = pPhyNode->node.pConditions;
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator"; pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
...@@ -4010,6 +4011,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -4010,6 +4011,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w); getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id); pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
...@@ -4048,8 +4050,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -4048,8 +4050,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo = SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
...@@ -4057,18 +4059,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -4057,18 +4059,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
pInfo->pCondition = pPhyFillNode->node.pConditions; pInfo->pCondition = pPhyFillNode->node.pConditions;
pInfo->pColMatchColInfo = pColMatchColInfo; pInfo->pColMatchColInfo = pColMatchColInfo;
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num; pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
...@@ -4118,6 +4120,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo ...@@ -4118,6 +4120,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
tb_uid_t suid = mr.me.ctbEntry.suid; tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid); metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
......
...@@ -4502,7 +4502,6 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4502,7 +4502,6 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
} }
size_t rows = pRes->info.rows; size_t rows = pRes->info.rows;
blockDataUpdateTsWindow(pRes, iaInfo->primaryTsIndex);
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pRes; return (rows == 0) ? NULL : pRes;
} }
......
...@@ -472,6 +472,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); ...@@ -472,6 +472,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
extern SSchDebug gSCHDebug; extern SSchDebug gSCHDebug;
......
...@@ -763,6 +763,17 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { ...@@ -763,6 +763,17 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
if (NULL == pReq || pReq->syncReq) {
return;
}
if (pReq->execFp) {
(*pReq->execFp)(NULL, pReq->cbParam, errCode);
} else if (pReq->fetchFp) {
(*pReq->fetchFp)(NULL, pReq->cbParam, errCode);
}
}
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t op = 0; int32_t op = 0;
...@@ -801,17 +812,13 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int ...@@ -801,17 +812,13 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) { int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = SCH_GET_JOB_STATUS(pJob);
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
switch (type) { switch (type) {
case SCH_OP_EXEC: case SCH_OP_EXEC:
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
...@@ -822,11 +829,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq ...@@ -822,11 +829,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
case SCH_OP_FETCH: case SCH_OP_FETCH:
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.syncReq = pReq->syncReq; pJob->opStatus.syncReq = pReq->syncReq;
if (!SCH_JOB_NEED_FETCH(pJob)) { if (!SCH_JOB_NEED_FETCH(pJob)) {
...@@ -839,10 +851,6 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq ...@@ -839,10 +851,6 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
pJob->userRes.fetchRes = pReq->pFetchRes;
pJob->userRes.fetchFp = pReq->fetchFp;
pJob->userRes.cbParam = pReq->cbParam;
break; break;
case SCH_OP_GET_STATUS: case SCH_OP_GET_STATUS:
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) { if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
...@@ -855,6 +863,11 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq ...@@ -855,6 +863,11 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop, status:%s", schGetOpStr(type), jobTaskStatusStr(status));
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -77,6 +77,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq ...@@ -77,6 +77,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
int32_t code = errCode; int32_t code = errCode;
if (NULL == pJob) { if (NULL == pJob) {
schDirectPostJobRes(pReq, errCode);
SCH_RET(code); SCH_RET(code);
} }
......
...@@ -201,6 +201,43 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { ...@@ -201,6 +201,43 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
return SYNC_TERM_INVALID; return SYNC_TERM_INVALID;
} }
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex index = 0;
SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (index < 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
return -1;
}
pEntry->index = index;
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
} while (0);
return 0;
}
#if 0
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
...@@ -243,6 +280,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -243,6 +280,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return code; return code;
} }
#endif
// entry found, return 0 // entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST // entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
...@@ -361,6 +399,8 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp ...@@ -361,6 +399,8 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
//------------------------------- //-------------------------------
// log[0 .. n] // log[0 .. n]
#if 0
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
...@@ -397,6 +437,44 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -397,6 +437,44 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
return code; return code;
} }
#endif
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex index = 0;
SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (index < 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
return -1;
}
pEntry->index = index;
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
} while (0);
return 0;
}
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
......
...@@ -573,8 +573,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { ...@@ -573,8 +573,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
if (nread < 0) { if (nread < 0) {
tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
T_REF_VAL_GET(conn));
conn->broken = true; conn->broken = true;
cliHandleExcept(conn); cliHandleExcept(conn);
} }
...@@ -650,7 +649,11 @@ static bool cliHandleNoResp(SCliConn* conn) { ...@@ -650,7 +649,11 @@ static bool cliHandleNoResp(SCliConn* conn) {
return res; return res;
} }
static void cliSendCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req && req->handle ? req->handle->data : NULL;
taosMemoryFree(req);
if (pConn == NULL) {
return;
}
if (status == 0) { if (status == 0) {
tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
...@@ -708,8 +711,8 @@ void cliSend(SCliConn* pConn) { ...@@ -708,8 +711,8 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
pConn->writeReq.data = pConn; uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
_RETURN: _RETURN:
return; return;
......
...@@ -265,8 +265,8 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -265,8 +265,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn, pConn->refId); pConn->refId);
assert(transMsg.info.handle != NULL); assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
...@@ -331,7 +331,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) { ...@@ -331,7 +331,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
} }
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
SSvrConn* conn = req->data; SSvrConn* conn = req && req->handle ? req->handle->data : NULL;
taosMemoryFree(req);
if (conn == NULL) return;
if (status == 0) { if (status == 0) {
tTrace("conn %p data already was written on stream", conn); tTrace("conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {
...@@ -390,7 +393,6 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -390,7 +393,6 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet; pHead->hasEpSet = pMsg->info.hasEpSet;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
} else { } else {
...@@ -433,7 +435,9 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { ...@@ -433,7 +435,9 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
uvPrepareSendData(smsg, &wb); uvPrepareSendData(smsg, &wb);
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrMsg* smsg) {
// impl // impl
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册