提交 e105d952 编写于 作者: D dapan1121

feature/qnode

上级 54582455
......@@ -22,6 +22,7 @@ extern "C" {
#include "os.h"
#include "thash.h"
#include "executor.h"
#define DS_BUF_LOW 1
#define DS_BUF_FULL 2
......
......@@ -547,6 +547,13 @@ void* doFetchRow(SRequestObj* pRequest) {
if (pRequest->type == TDMT_VND_QUERY) {
pRequest->type = TDMT_VND_FETCH;
scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
pResultInfo->current = 0;
if (pResultInfo->numOfRows <= pResultInfo->current) {
return NULL;
}
goto _return;
} else if (pRequest->type == TDMT_MND_SHOW) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
......@@ -590,6 +597,8 @@ void* doFetchRow(SRequestObj* pRequest) {
}
}
_return:
for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
......
......@@ -109,10 +109,14 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0;
pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows);
if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows;
}
pBuf->useSize += pEntry->dataLen;
// todo completed
}
......@@ -213,6 +217,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fEndPut = endPut;
dispatcher->sink.fGetLen = getDataLength;
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
......
......@@ -77,13 +77,13 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
goto _error;
}
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000};
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
code = dsDataSinkMgtInit(&cfg);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = dsCreateDataSinker(pSubplan->pDataSink, (*pTask)->dsHandle);
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
_error:
// if failed to add ref for all tables in this query, abort current query
......@@ -178,7 +178,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
handle = &pTaskInfo->dsHandle;
*handle = pTaskInfo->dsHandle;
while(1) {
st = taosGetTimestampUs();
......@@ -188,6 +188,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pRes == NULL) { // no results generated yet, abort
dsEndPut(pTaskInfo->dsHandle, pTaskInfo->cost.elapsedTime);
return pTaskInfo->code;
}
......
......@@ -617,6 +617,8 @@ int32_t qwInitFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
*rsp = pRsp;
return TSDB_CODE_SUCCESS;
}
......@@ -866,7 +868,61 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD);
if (code) {
qError("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code);
}
code = qwAcquireTask(QW_READ, sch, qId, tId, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) {
qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
QW_ERR_RET(code);
}
QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task));
}
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
}
if (task->drop) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, sId, qId, tId);
return TSDB_CODE_SUCCESS;
}
if (!(task->cancel || task->drop)) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
task->code = errCode;
QW_UNLOCK(QW_WRITE, &task->lock);
}
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
......@@ -876,6 +932,9 @@ int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uin
int32_t dataLength = 0;
SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false;
SQWorkerTaskHandlesCache *handles = NULL;
QW_ERR_JRET(qwAcquireTaskHandles(QW_READ, mgmt, queryId, taskId, &handles));
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
......@@ -955,59 +1014,11 @@ _return:
qwBuildAndSendFetchRsp(pMsg, rsp, dataLength, code);
}
QW_RET(code);
}
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD);
if (code) {
qError("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code);
}
code = qwAcquireTask(QW_READ, sch, qId, tId, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) {
qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
QW_ERR_RET(code);
}
QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task));
}
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
}
if (task->drop) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, sId, qId, tId);
return TSDB_CODE_SUCCESS;
}
if (!(task->cancel || task->drop)) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
task->code = errCode;
QW_UNLOCK(QW_WRITE, &task->lock);
if (handles) {
qwReleaseTaskResCache(QW_READ, mgmt);
}
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
QW_RET(code);
}
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
......@@ -1081,6 +1092,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (code) {
qError("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL));
......@@ -1094,6 +1106,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
code = qExecTask(pTaskInfo, &sinkHandle);
if (code) {
qError("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle));
......@@ -1225,17 +1238,10 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId));
void *data = NULL;
SQWorkerTaskHandlesCache *handles = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireTaskHandles(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &handles));
QW_ERR_JRET(qwHandleFetch(handles, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
QW_ERR_RET(qwHandleFetch(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
_return:
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_RET(code);
}
......
......@@ -68,7 +68,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -497,6 +497,33 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
}
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*moved = true;
SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
return TSDB_CODE_SUCCESS;
}
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
......@@ -657,7 +684,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(code);
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
SCH_ERR_JRET(schRecordTaskSucceedNode(pTask));
......@@ -689,6 +716,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
}
pJob->fetchTask = pTask;
code = schMoveTaskToExecList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(code);
}
SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob));
......@@ -836,7 +868,10 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
atomic_add_fetch_32(&pJob->ref, 1);
int32_t s = taosHashGetSize(pJob->execTasks);
assert(s != 0);
if (s <= 0) {
qError("QID:%"PRIx64",TID:%"PRIx64" no task in execTask list", pParam->queryId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
......@@ -1383,6 +1418,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if (status == JOB_TASK_STATUS_FAILED) {
code = atomic_load_32(&pJob->errCode);
SCH_ERR_JRET(code);
}
if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) {
......
......@@ -217,7 +217,7 @@ void *schtSendRsp(void *param) {
return NULL;
}
void *pInsertJob = NULL;
struct SSchJob *pInsertJob = NULL;
}
......@@ -228,7 +228,7 @@ TEST(queryTest, normalCase) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
void *pJob = NULL;
SSchJob *pJob = NULL;
SQueryDag dag = {0};
schtInitLogFile();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册