提交 699f2828 编写于 作者: H Haojun Liao

[td-225] fix bugs in non-blocking processing.

上级 d9a04b39
...@@ -49,7 +49,7 @@ static taos_qset readQset; ...@@ -49,7 +49,7 @@ static taos_qset readQset;
int32_t dnodeInitVnodeRead() { int32_t dnodeInitVnodeRead() {
readQset = taosOpenQset(); readQset = taosOpenQset();
readPool.min = 2; readPool.min = 4;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max); readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max);
......
...@@ -6352,7 +6352,6 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -6352,7 +6352,6 @@ bool qTableQuery(qinfo_t qinfo) {
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
} }
// pQInfo->dataReady = QUERY_RESULT_READY;
bool buildRes = false; bool buildRes = false;
pthread_mutex_lock(&pQInfo->lock); pthread_mutex_lock(&pQInfo->lock);
pQInfo->dataReady = QUERY_RESULT_READY; pQInfo->dataReady = QUERY_RESULT_READY;
...@@ -6360,8 +6359,9 @@ bool qTableQuery(qinfo_t qinfo) { ...@@ -6360,8 +6359,9 @@ bool qTableQuery(qinfo_t qinfo) {
if (pQInfo->rspContext != NULL) { if (pQInfo->rspContext != NULL) {
buildRes = true; buildRes = true;
} }
pthread_mutex_unlock(&pQInfo->lock);
pthread_mutex_unlock(&pQInfo->lock);
return buildRes; return buildRes;
// sem_post(&pQInfo->dataReady); // sem_post(&pQInfo->dataReady);
} }
...@@ -6387,12 +6387,11 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -6387,12 +6387,11 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows, qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%"PRId64", code:%d", pQInfo, pQuery->rowSize, pQuery->rec.rows,
pQInfo->code); pQInfo->code);
} else { } else {
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
pQInfo->rspContext = pRspContext; pQInfo->rspContext = pRspContext;
} }
pthread_mutex_unlock(&pQInfo->lock); pthread_mutex_unlock(&pQInfo->lock);
// sem_wait(&pQInfo->dataReady);
return pQInfo->code; return pQInfo->code;
} }
......
...@@ -163,18 +163,21 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -163,18 +163,21 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = TSDB_CODE_QRY_INVALID_QHANDLE; code = TSDB_CODE_QRY_INVALID_QHANDLE;
} else { } else {
vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont); vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
bool buildRes = qTableQuery(*handle); // do execute query bool buildRes = qTableQuery(*handle); // do execute query
if (buildRes) { // build result rsp if (buildRes) { // build result rsp
vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused", pVnode->vgId, *handle);
pRet = &pReadMsg->rspRet; pRet = &pReadMsg->rspRet;
bool continueExec = false; bool continueExec = false;
code = TSDB_CODE_QRY_HAS_RSP;
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) { if (continueExec) {
vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle); vnodePutItemIntoReadQueue(pVnode, *handle, pReadMsg->rpcMsg.handle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
code = TSDB_CODE_SUCCESS;
} }
} else { // todo handle error } else { // todo handle error
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册