提交 43f836ee 编写于 作者: D dapan1121

feature/qnode

上级 ae4aef40
...@@ -226,23 +226,27 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t ...@@ -226,23 +226,27 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.pQueryJob != NULL) {
// handle error and retry schedulerFreeJob(pRequest->body.pQueryJob);
} else {
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
}
} }
pRequest->body.resInfo.numOfRows = res.numOfRows; pRequest->code = code;
pRequest->code = res.code;
return pRequest->code; return pRequest->code;
} }
return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob); if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
}
}
pRequest->code = res.code;
return pRequest->code;
} }
TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
......
...@@ -578,7 +578,7 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod ...@@ -578,7 +578,7 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
atomic_store_32(&pJob->errCode, errCode); atomic_store_32(&pJob->errCode, errCode);
} }
if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) {
tsem_post(&pJob->rspSem); tsem_post(&pJob->rspSem);
} }
...@@ -638,7 +638,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { ...@@ -638,7 +638,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED)); SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED));
if ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule) { if (pJob->attr.syncSchedule) {
tsem_post(&pJob->rspSem); tsem_post(&pJob->rspSem);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册