提交 d31c7033 编写于 作者: H Haojun Liao

[td-255] code refactor.

上级 261bfd00
...@@ -318,6 +318,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC ...@@ -318,6 +318,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries);
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
......
...@@ -2459,11 +2459,48 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { ...@@ -2459,11 +2459,48 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
tfree(p); tfree(p);
} }
static void doConcurrentlySendSubQueries(SSqlObj* pSql) {
SSubqueryState *pState = &pSql->subState;
// concurrently sent the query requests.
const int32_t MAX_REQUEST_PER_TASK = 8;
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
assert(numOfTasks >= 1);
int32_t num;
if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) {
num = MAX_REQUEST_PER_TASK;
} else {
num = pState->numOfSub / numOfTasks + 1;
}
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doSendQueryReqs;
schedMsg.ahandle = (void*)pSql;
schedMsg.thandle = NULL;
SPair* p = calloc(1, sizeof(SPair));
p->first = j * num;
if (j == numOfTasks - 1) {
p->second = pState->numOfSub;
} else {
p->second = (j + 1) * num;
}
schedMsg.msg = p;
taosScheduleTask(tscQhandle, &schedMsg);
}
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
// pRes->code check only serves in launching metric sub-queries // pRes->code check only serves in launching super table sub-queries
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function. pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
return pRes->code; return pRes->code;
...@@ -2474,22 +2511,23 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2474,22 +2511,23 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
pRes->qId = 0x1; // hack the qhandle check pRes->qId = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 18u); // 256KB const uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState; SSubqueryState *pState = &pSql->subState;
pState->numOfSub = 0; int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
if (pTableMetaInfo->pVgroupTables == NULL) { : (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
} else { int32_t ret = doInitSubState(pSql, numOfSub);
pState->numOfSub = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); if (ret != 0) {
tscAsyncResultOnError(pSql);
return ret;
} }
assert(pState->numOfSub > 0); ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
int32_t ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
if (ret != 0) { if (ret != 0) {
pRes->code = ret; pRes->code = ret;
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
...@@ -2499,32 +2537,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2499,32 +2537,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
} }
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) {
tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub);
tscAsyncResultOnError(pSql);
return ret;
}
if (pState->states == NULL) {
pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
if (pState->states == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub);
tscAsyncResultOnError(pSql);
return ret;
}
pthread_mutex_init(&pState->mutex, NULL);
}
memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0; int32_t i = 0;
...@@ -2545,8 +2557,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2545,8 +2557,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
break; break;
} }
trs->subqueryIndex = i; trs->subqueryIndex = i;
trs->pParentSql = pSql; trs->pParentSql = pSql;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) { if (pNew == NULL) {
...@@ -2582,39 +2594,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2582,39 +2594,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
// concurrently sent the query requests. doConcurrentlySendSubQueries(pSql);
const int32_t MAX_REQUEST_PER_TASK = 8;
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
assert(numOfTasks >= 1);
int32_t num;
if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) {
num = MAX_REQUEST_PER_TASK;
} else {
num = pState->numOfSub / numOfTasks + 1;
}
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doSendQueryReqs;
schedMsg.ahandle = (void*)pSql;
schedMsg.thandle = NULL;
SPair* p = calloc(1, sizeof(SPair));
p->first = j * num;
if (j == numOfTasks - 1) {
p->second = pState->numOfSub;
} else {
p->second = (j + 1) * num;
}
schedMsg.msg = p;
taosScheduleTask(tscQhandle, &schedMsg);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -3944,6 +3944,21 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3944,6 +3944,21 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
} }
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL);
pSql->subState.numOfSub = numOfSubqueries;
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL);
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
// do execute the query according to the query execution plan // do execute the query according to the query execution plan
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -3959,16 +3974,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3959,16 +3974,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
assert(pSql->subState.numOfSub == 0); code = doInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream));
pSql->subState.numOfSub = (int32_t) taosArrayGetSize(pQueryInfo->pUpstream); if (code != TSDB_CODE_SUCCESS) {
assert(pSql->pSubs == NULL);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
assert(pSql->subState.states == NULL);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
code = pthread_mutex_init(&pSql->subState.mutex, NULL);
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册