提交 46e58d52 编写于 作者: H hjxilinx

[TD-32] refactor codes.

上级 ca859544
......@@ -32,6 +32,12 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
#ifdef __cplusplus
}
#endif
......
......@@ -104,7 +104,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
......@@ -252,8 +252,6 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
int32_t launchMultivnodeInsert(SSqlObj *pSql);
#ifdef __cplusplus
}
#endif
......
......@@ -259,7 +259,7 @@ typedef struct {
union {
bool existsCheck; // check if the table exists or not
bool inStream; // denote if current sql is executed in stream or not
bool createOnDemand; // if the table is missing, on-the-fly create it. during getmeterMeta
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
};
......@@ -404,8 +404,6 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
void tscQueueAsyncRes(SSqlObj *pSql);
......
......@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
#include "os.h"
#include "taos.h"
#include "tlog.h"
#include "tscJoinProcess.h"
#include "tsclient.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "ttime.h"
int __init = 0;
......
......@@ -397,48 +397,6 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql) {
taosScheduleTask(tscQhandle, &schedMsg);
}
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj *pSql = (SSqlObj *)param;
SSqlCmd *pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
assert(pCmd->dataSourceType != 0 && pSql->signature == pSql);
int32_t index = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
if (pDataBlocks == NULL || pTableMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
// restore user defined fp
pSql->fp = pSql->fetchFp;
tscTrace("%p Async insertion completed, destroy data block list", pSql);
// release data block data
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// all data has been sent to vnode, call user function
(*pSql->fp)(pSql->param, tres, numOfRows);
} else {
do {
code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pTableMetaInfo->vnodeIndex++]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d",
pSql, pTableMetaInfo->vnodeIndex - 1, pDataBlocks->nSize, code);
}
} while (code != TSDB_CODE_SUCCESS && pTableMetaInfo->vnodeIndex < pDataBlocks->nSize);
// build submit msg may fail
if (code == TSDB_CODE_SUCCESS) {
tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pTableMetaInfo->vnodeIndex - 1, pDataBlocks->nSize);
tscProcessSql(pSql);
}
}
}
int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
......
......@@ -24,7 +24,7 @@
#include "taosdef.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tscJoinProcess.h"
#include "tscSubquery.h"
#include "tscompression.h"
#include "tsqlfunction.h"
#include "ttime.h"
......
......@@ -28,6 +28,7 @@
#include "taosdef.h"
#include "tlog.h"
#include "tscSubquery.h"
#include "tstoken.h"
#include "ttime.h"
......@@ -1324,7 +1325,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
pSql->fetchFp = pSql->fp;
// replace user defined callback function with multi-insert proxy function
pSql->fp = (void(*)())launchMultivnodeInsert;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
}
ret = tsParseInsertSql(pSql);
......
......@@ -16,9 +16,9 @@
#include "os.h"
#include "tcache.h"
#include "trpc.h"
#include "tscJoinProcess.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
......@@ -362,143 +362,28 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
rpcFreeCont(rpcMsg->pCont);
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
static int tscLaunchSTableSubqueries(SSqlObj *pSql);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
if (pSql->pSubs == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
}
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pSql->pSubs[pSql->numOfSubs++] = pNew;
assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
// refactor as one method
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo != NULL);
tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
pNew->cmd.numOfCols = 0;
pNewQueryInfo->intervalTime = 0;
memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
// backup the data and clear it in the sqlcmd object
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
// this data needs to be transfer to support struct
pNewQueryInfo->fieldsInfo.numOfOutputCols = 0;
pNewQueryInfo->exprsInfo.numOfExprs = 0;
// set the ts,tags that involved in join, as the output column of intermediate result
tscClearSubqueryInfo(&pNew->cmd);
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1;
// add the filter tag column
for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
SColumnBase *pColBase = &pSupporter->colList.pColList[i];
if (pColBase->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
pNewQueryInfo->colList.numOfCols++;
}
}
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0);
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause(pNew, 0);
#endif
return tscProcessSql(pNew);
}
int doProcessSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
void *asyncFp = pSql->fp;
if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) {
if (pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_INSERT ||
pCmd->command == TSDB_SQL_CONNECT ||
pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META ||
pCmd->command == TSDB_SQL_METRIC) {
tscBuildMsg[pCmd->command](pSql, NULL);
}
int32_t code = tscSendMsgToServer(pSql);
if (asyncFp) {
if (code != TSDB_CODE_SUCCESS) {
pRes->code = code;
tscQueueAsyncRes(pSql);
}
return 0;
}
if (code != TSDB_CODE_SUCCESS) {
pRes->code = code;
return code;
tscQueueAsyncRes(pSql);
}
tsem_wait(&pSql->rspSem);
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) (*tscProcessMsgRsp[pCmd->command])(pSql);
tsem_post(&pSql->emptyRspSem);
return pRes->code;
return TSDB_CODE_SUCCESS;
}
int tscProcessSql(SSqlObj *pSql) {
......@@ -508,7 +393,7 @@ int tscProcessSql(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = NULL;
int16_t type = 0;
uint16_t type = 0;
if (pQueryInfo != NULL) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -551,42 +436,7 @@ int tscProcessSql(SSqlObj *pSql) {
// todo handle async situation
if (QUERY_IS_JOIN_QUERY(type)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pSql->res.code;
}
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
break;
}
}
tsem_post(&pSql->emptyRspSem);
tsem_wait(&pSql->rspSem);
tsem_post(&pSql->emptyRspSem);
if (pSql->numOfSubs <= 0) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
} else {
pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
}
return TSDB_CODE_SUCCESS;
return tscHandleMasterJoinQuery(pSql);
} else {
// for first stage sub query, iterate all vnodes to get all timestamp
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
......@@ -595,442 +445,22 @@ int tscProcessSql(SSqlObj *pSql) {
}
}
if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
/*
* (ref. line: 964)
* Before this function returns from tscLaunchSTableSubqueries and continues, pSql may have been released at user
* program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack.
*
* when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
* which causes deadlock. So we keep it as local variable.
*/
if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
return pSql->res.code;
} else if (pSql->fp == (void(*)())launchMultivnodeInsert) { // multi-vnodes insertion
launchMultivnodeInsert(pSql);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscHandleMasterSTableQuery(pSql);
return pRes->code;
} else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion
tscHandleMultivnodeInsert(pSql);
return pSql->res.code;
}
return doProcessSql(pSql);
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL);
SRetrieveSupport* pSupport = pSub->param;
tfree(pSupport->localBuffer);
pthread_mutex_unlock(&pSupport->queryMutex);
pthread_mutex_destroy(&pSupport->queryMutex);
tfree(pSupport);
tscFreeSqlObj(pSub);
}
free(pState);
}
int tscLaunchSTableSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// pRes->code check only serves in launching metric sub-queries
if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill metric function.
return pRes->code;
}
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel * pModel = NULL;
pRes->qhandle = 1; // hack the qhandle check
const uint32_t nBufferSize = (1 << 16); // 64KB
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfSubQueries > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
if (pSql->fp) {
tscQueueAsyncRes(pSql);
}
return pRes->code;
}
pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
pSql->numOfSubs = numOfSubQueries;
tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = numOfSubQueries;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < numOfSubQueries; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
}
trs->pExtMemBuffer = pMemoryBuf;
trs->pOrderDescriptor = pDesc;
trs->pState = pState;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
if (trs->localBuffer == NULL) {
tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
tfree(trs);
break;
}
trs->subqueryIndex = i;
trs->pParentSqlObj = pSql;
trs->pFinalColModel = pModel;
pthread_mutexattr_t mutexattr = {0};
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&trs->queryMutex, &mutexattr);
pthread_mutexattr_destroy(&mutexattr);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
tfree(trs->localBuffer);
tfree(trs);
break;
}
// todo handle multi-vnode situation
if (pQueryInfo->tsBuf) {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
}
tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
}
if (i < numOfSubQueries) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
doCleanupSubqueries(pSql, i, pState);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
doCleanupSubqueries(pSql, i, pState);
return pRes->code;
}
for(int32_t j = 0; j < numOfSubQueries; ++j) {
SSqlObj* pSub = pSql->pSubs[j];
SRetrieveSupport* pSupport = pSub->param;
tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
}
static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
tscTrace("%p start to free subquery result", pSql);
if (pSql->res.code == TSDB_CODE_SUCCESS) {
taos_free_result(pSql);
}
tfree(trsupport->localBuffer);
pthread_mutex_unlock(&trsupport->queryMutex);
pthread_mutex_destroy(&trsupport->queryMutex);
tfree(trsupport);
}
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
// set no disk space error info
#ifdef WINDOWS
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
LocalFree(lpMsgBuf);
#else
char buf[256] = {0};
strerror_r(errno, buf, 256);
tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
#endif
trsupport->pState->code = -errCode;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
pthread_mutex_unlock(&trsupport->queryMutex);
tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
}
static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
SSqlObj *pPObj = trsupport->pParentSqlObj;
int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL);
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
/* retrieved in subquery failed. OR query cancelled in retrieve phase. */
if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
pState->code = -(int)pPObj->res.code;
/*
* kill current sub-query connection, which may retrieve data from vnodes;
* Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED
*/
pSql->res.numOfRows = 0;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts
tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
subqueryIndex, pState->code);
}
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
subqueryIndex, pState->code);
} else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results
trsupport->localBuffer->numOfElems = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql);
pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return;
}
tscProcessSql(pNew);
return;
} else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
numOfRows, subqueryIndex, pState->code);
}
}
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
pPObj->res.code = -(pState->code);
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
pState->numOfTotal);
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
} else { // regular super table query
if (pPObj->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pPObj);
}
}
}
void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
return;
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
}
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
pPObj, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
return;
}
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret < 0) {
// set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
} else {
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
}
} else { // all data has been retrieved to client
/* data in from current vnode is stored in cache and disk */
uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
pSvd->vnode, numOfRowsFromVnode, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
#ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
trsupport->localBuffer->numOfElems, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
// each result for a vnode is ordered as an independant list,
// then used as an input of loser tree for disk-based merge routine
int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
pQueryInfo->groupbyExpr.orderType);
if (ret != 0) {
/* set no disk space error info, and abort retry */
return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
}
// keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
// increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
// In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
pState->numOfTotal, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
&pPObj->cmd, &pPObj->res);
tscTrace("%p build loser tree completed", pPObj);
pPObj->res.precision = pSql->res.precision;
pPObj->res.numOfRows = 0;
pPObj->res.row = 0;
// only free once
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
if (pPObj->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0);
} else {
tscQueueAsyncRes(pPObj);
}
}
}
void tscKillMetricQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (!tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return;
}
......@@ -1068,117 +498,6 @@ void tscKillMetricQuery(SSqlObj *pSql) {
tscTrace("%p metric query is cancelled", pSql);
}
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
const int32_t table_index = 0;
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
// launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex;
pSql->pSubs[trsupport->subqueryIndex] = pNew;
}
return pNew;
}
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
SSqlObj* pParentSql = trsupport->pParentSqlObj;
SSqlObj* pSql = (SSqlObj *)tres;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
int32_t idx = pTableMetaInfo->vnodeIndex;
SVnodeSidList *vnodeInfo = NULL;
SVnodeDesc * pSvd = NULL;
if (pTableMetaInfo->pMetricMeta != NULL) {
vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pParentSql->numOfSubs == pState->numOfTotal);
if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
// metric query is killed, Note: code must be less than 0
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
code = -(int)(pParentSql->res.code);
} else {
code = pState->code;
}
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
trsupport->subqueryIndex, code);
}
/*
* if a query on a vnode is failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
* function to abort current and remain retrieve process.
*
* NOTE: threadsafe is required.
*/
if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
atomic_val_compare_exchange_32(&pState->code, 0, code);
} else { // does not reach the maximum retry count, go on
tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL);
tscProcessSql(pNew);
return;
}
}
}
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->subqueryIndex, pState->code);
} else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
trsupport->subqueryIndex, pState->code);
}
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->subqueryIndex);
} else {
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
trsupport->subqueryIndex);
}
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
}
}
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart;
......@@ -2239,11 +1558,11 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
if (pSql->cmd.createOnDemand) {
if (pSql->cmd.autoCreated) {
memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
pMsg += sizeof(STagData);
}
......@@ -3075,7 +2394,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, STableMetaInfo *pTableMet
SQueryInfo *pNewQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);
pNew->cmd.createOnDemand = pSql->cmd.createOnDemand; // create table if not exists
pNew->cmd.autoCreated = pSql->cmd.autoCreated; // create table if not exists
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p malloc failed for payload to get meter meta", pSql);
free(pNew);
......@@ -3126,7 +2445,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
}
int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists) {
pSql->cmd.createOnDemand = createIfNotExists;
pSql->cmd.autoCreated = createIfNotExists;
return tscGetTableMeta(pSql, pTableMetaInfo);
}
......
......@@ -15,21 +15,21 @@
#include "hash.h"
#include "os.h"
#include "qast.h"
#include "tcache.h"
#include "tlog.h"
#include "tnote.h"
#include "trpc.h"
#include "tscJoinProcess.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscompression.h"
#include "tsocket.h"
#include "ttimer.h"
#include "tutil.h"
#include "ttokendef.h"
#include "qast.h"
#include "tutil.h"
static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) {
......@@ -994,7 +994,7 @@ void taos_stop_query(TAOS_RES *res) {
pSql->res.code = TSDB_CODE_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
tscKillMetricQuery(pSql);
return;
}
......
......@@ -13,10 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscSubquery.h"
#include "os.h"
#include "tscJoinProcess.h"
#include "tsclient.h"
#include "qtsbuf.h"
#include "tsclient.h"
typedef struct SInsertSupporter {
SSubqueryState* pState;
SSqlObj* pSql;
} SInsertSupporter;
static void freeSubqueryObj(SSqlObj* pSql);
......@@ -809,4 +814,759 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
}
}
}
}
\ No newline at end of file
}
/////////////////////////////////////////////////////////////////////////////////////////
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES);
if (pSql->pSubs == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
}
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pSql->pSubs[pSql->numOfSubs++] = pNew;
assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
// refactor as one method
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo != NULL);
tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
pNew->cmd.numOfCols = 0;
pNewQueryInfo->intervalTime = 0;
memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
// backup the data and clear it in the sqlcmd object
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
// this data needs to be transfer to support struct
pNewQueryInfo->fieldsInfo.numOfOutputCols = 0;
pNewQueryInfo->exprsInfo.numOfExprs = 0;
// set the ts,tags that involved in join, as the output column of intermediate result
tscClearSubqueryInfo(&pNew->cmd);
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1;
// add the filter tag column
for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
SColumnBase *pColBase = &pSupporter->colList.pColList[i];
if (pColBase->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
pNewQueryInfo->colList.numOfCols++;
}
}
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0);
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause(pNew, 0);
#endif
return tscProcessSql(pNew);
}
// todo support async join query
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pSql->res.code;
}
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
break;
}
}
tsem_post(&pSql->emptyRspSem);
tsem_wait(&pSql->rspSem);
tsem_post(&pSql->emptyRspSem);
if (pSql->numOfSubs <= 0) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
} else {
pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
}
return TSDB_CODE_SUCCESS;
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL);
SRetrieveSupport* pSupport = pSub->param;
tfree(pSupport->localBuffer);
pthread_mutex_unlock(&pSupport->queryMutex);
pthread_mutex_destroy(&pSupport->queryMutex);
tfree(pSupport);
tscFreeSqlObj(pSub);
}
free(pState);
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// pRes->code check only serves in launching metric sub-queries
if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill metric function.
return pRes->code;
}
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel * pModel = NULL;
pRes->qhandle = 1; // hack the qhandle check
const uint32_t nBufferSize = (1 << 16); // 64KB
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfSubQueries > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
if (pSql->fp) {
tscQueueAsyncRes(pSql);
}
return pRes->code;
}
pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
pSql->numOfSubs = numOfSubQueries;
tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = numOfSubQueries;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < numOfSubQueries; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
}
trs->pExtMemBuffer = pMemoryBuf;
trs->pOrderDescriptor = pDesc;
trs->pState = pState;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
if (trs->localBuffer == NULL) {
tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
tfree(trs);
break;
}
trs->subqueryIndex = i;
trs->pParentSqlObj = pSql;
trs->pFinalColModel = pModel;
pthread_mutexattr_t mutexattr = {0};
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&trs->queryMutex, &mutexattr);
pthread_mutexattr_destroy(&mutexattr);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
tfree(trs->localBuffer);
tfree(trs);
break;
}
// todo handle multi-vnode situation
if (pQueryInfo->tsBuf) {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
}
tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
}
if (i < numOfSubQueries) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
doCleanupSubqueries(pSql, i, pState);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
doCleanupSubqueries(pSql, i, pState);
return pRes->code;
}
for(int32_t j = 0; j < numOfSubQueries; ++j) {
SSqlObj* pSub = pSql->pSubs[j];
SRetrieveSupport* pSupport = pSub->param;
tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
}
static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
tscTrace("%p start to free subquery result", pSql);
if (pSql->res.code == TSDB_CODE_SUCCESS) {
taos_free_result(pSql);
}
tfree(trsupport->localBuffer);
pthread_mutex_unlock(&trsupport->queryMutex);
pthread_mutex_destroy(&trsupport->queryMutex);
tfree(trsupport);
}
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
// set no disk space error info
#ifdef WINDOWS
LPVOID lpMsgBuf;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL,
GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf, 0, NULL);
tscError("sub:%p failed to flush data to disk:reason:%s", tres, lpMsgBuf);
LocalFree(lpMsgBuf);
#else
char buf[256] = {0};
strerror_r(errno, buf, 256);
tscError("sub:%p failed to flush data to disk:reason:%s", tres, buf);
#endif
trsupport->pState->code = -errCode;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
pthread_mutex_unlock(&trsupport->queryMutex);
tscRetrieveFromVnodeCallBack(trsupport, tres, trsupport->pState->code);
}
static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
SSqlObj *pPObj = trsupport->pParentSqlObj;
int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL);
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
/* retrieved in subquery failed. OR query cancelled in retrieve phase. */
if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
pState->code = -(int)pPObj->res.code;
/*
* kill current sub-query connection, which may retrieve data from vnodes;
* Here we get: pPObj->res.code == TSDB_CODE_QUERY_CANCELLED
*/
pSql->res.numOfRows = 0;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts
tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
subqueryIndex, pState->code);
}
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
subqueryIndex, pState->code);
} else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results
trsupport->localBuffer->numOfElems = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql);
pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return;
}
tscProcessSql(pNew);
return;
} else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
numOfRows, subqueryIndex, pState->code);
}
}
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
pPObj->res.code = -(pState->code);
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
pState->numOfTotal);
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
} else { // regular super table query
if (pPObj->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pPObj);
}
}
}
static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
return;
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
}
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
pPObj, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
return;
}
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
pRes->numOfRows, pQueryInfo->groupbyExpr.orderType);
if (ret < 0) {
// set no disk space error info, and abort retry
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
} else {
pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
}
} else { // all data has been retrieved to client
/* data in from current vnode is stored in cache and disk */
uint32_t numOfRowsFromVnode = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
pSvd->vnode, numOfRowsFromVnode, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
#ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems,
trsupport->localBuffer->numOfElems, colInfo);
#endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
return;
}
// each result for a vnode is ordered as an independant list,
// then used as an input of loser tree for disk-based merge routine
int32_t ret = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer,
pQueryInfo->groupbyExpr.orderType);
if (ret != 0) {
/* set no disk space error info, and abort retry */
return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
}
// keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
// increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
// In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
pState->numOfTotal, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
&pPObj->cmd, &pPObj->res);
tscTrace("%p build loser tree completed", pPObj);
pPObj->res.precision = pSql->res.precision;
pPObj->res.numOfRows = 0;
pPObj->res.row = 0;
// only free once
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
if (pPObj->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0);
} else {
tscQueueAsyncRes(pPObj);
}
}
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
const int32_t table_index = 0;
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
// launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
pTableMetaInfo->vnodeIndex = trsupport->subqueryIndex;
pSql->pSubs[trsupport->subqueryIndex] = pNew;
}
return pNew;
}
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
SSqlObj* pParentSql = trsupport->pParentSqlObj;
SSqlObj* pSql = (SSqlObj *)tres;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
int32_t idx = pTableMetaInfo->vnodeIndex;
SVnodeSidList *vnodeInfo = NULL;
SVnodeDesc * pSvd = NULL;
if (pTableMetaInfo->pMetricMeta != NULL) {
vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pParentSql->numOfSubs == pState->numOfTotal);
if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
// metric query is killed, Note: code must be less than 0
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
code = -(int)(pParentSql->res.code);
} else {
code = pState->code;
}
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
trsupport->subqueryIndex, code);
}
/*
* if a query on a vnode is failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromVnodeCallBack
* function to abort current and remain retrieve process.
*
* NOTE: threadsafe is required.
*/
if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
atomic_val_compare_exchange_32(&pState->code, 0, code);
} else { // does not reach the maximum retry count, go on
tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL);
tscProcessSql(pNew);
return;
}
}
}
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->subqueryIndex, pState->code);
} else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
trsupport->subqueryIndex, pState->code);
}
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->subqueryIndex);
} else {
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
trsupport->subqueryIndex);
}
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
}
}
static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
int32_t total = pState->numOfTotal;
// increase the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
}
int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (completed < total) {
return;
}
tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
// release data block data
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
// all data has been sent to vnode, call user function
(*pParentObj->fp)(pParentObj->param, tres, numOfRows);
}
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->qhandle = 1; // hack the qhandle check
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES);
pSql->numOfSubs = pDataBlocks->nSize;
assert(pDataBlocks->nSize > 0);
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pSql->numOfSubs; ++i) {
SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter));
pSupporter->pSql = pSql;
pSupporter->pState = pState;
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
}
pSql->pSubs[i] = pNew;
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
}
if (i < pSql->numOfSubs) {
tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code; // free all allocated resource
}
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
pSub->cmd.command = TSDB_SQL_INSERT;
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j,
pDataBlocks->nSize, code);
}
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
}
......@@ -21,9 +21,9 @@
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
#include "tscJoinProcess.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
......@@ -209,7 +209,7 @@ STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
return (STableSidExtInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
}
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) {
return false;
}
......@@ -2321,94 +2321,3 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
tscProcessSql(pSql);
}
}
typedef struct SinsertSupporter {
SSubqueryState* pState;
SSqlObj* pSql;
} SinsertSupporter;
void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
SinsertSupporter *pSupporter = (SinsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
int32_t total = pState->numOfTotal;
// increase the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
}
int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (completed < total) {
return;
}
tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
// release data block data
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
// all data has been sent to vnode, call user function
(*pParentObj->fp)(pParentObj->param, tres, numOfRows);
}
int32_t launchMultivnodeInsert(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->qhandle = 1; // hack the qhandle check
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES);
pSql->numOfSubs = pDataBlocks->nSize;
assert(pDataBlocks->nSize > 0);
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pSql->numOfSubs; ++i) {
SinsertSupporter* pSupporter = calloc(1, sizeof(SinsertSupporter));
pSupporter->pSql = pSql;
pSupporter->pState = pState;
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
}
pSql->pSubs[i] = pNew;
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
}
if (i < pSql->numOfSubs) {
tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code; // free all allocated resource
}
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
pSub->cmd.command = TSDB_SQL_INSERT;
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j,
pDataBlocks->nSize, code);
}
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册