提交 ac9ae238 编写于 作者: H Haojun Liao

[td-225]fix the bug found by regression test.

上级 b4a072f0
......@@ -39,39 +39,29 @@ typedef struct SLocalDataSource {
} SLocalDataSource;
typedef struct SLocalMerger {
SLocalDataSource ** pLocalDataSrc;
SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;
int32_t numOfVnode;
SLoserTreeInfo * pLoserTree;
tFilePage * pResultBuf;
int32_t nResultBufSize;
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
bool orderPrjOnSTable; // projection query on stable
SLoserTreeInfo *pLoserTree;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor *pDesc;
tExtMemBuffer **pExtMemBuffer; // disk-based buffer
char *buf; // temp buffer
} SLocalMerger;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
SColumnModel* pFinalColModel; // colModel for final result
SColumnModel* pFFColModel;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
int32_t tscLocalReducerEnvCreate(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType);
......@@ -81,12 +71,10 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
/*
* create local reducer to launch the second-stage reduce process at client site
*/
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql);
int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo *pQueryInfo, SLocalMerger **pMerger, int64_t id);
void tscDestroyLocalMerger(SSqlObj *pSql);
//int32_t tscDoLocalMerge(SSqlObj *pSql);
void tscDestroyLocalMerger(SLocalMerger* pLocalMerger);
#ifdef __cplusplus
}
......
......@@ -14,13 +14,11 @@
*/
#include "tscLocalMerge.h"
#include "tscSubquery.h"
//#include "tscSubquery.h"
#include "os.h"
#include "texpr.h"
#include "tlosertree.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "qUtil.h"
......@@ -59,77 +57,25 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
}
// todo merge with vnode side function
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx[i].order = pQueryInfo->order.order;
pCtx[i].functionId = pExpr->base.functionId;
pCtx[i].order = pQueryInfo->order.order;
pCtx[i].functionId = pExpr->base.functionId;
// input data format comes from pModel
pCtx[i].inputType = pSchema[i].type;
pCtx[i].inputBytes = pSchema[i].bytes;
pCtx[i].outputBytes = pExpr->base.resBytes;
pCtx[i].outputType = pExpr->base.resType;
// input buffer hold only one point data
pCtx[i].size = 1;
pCtx[i].hasNull = true;
pCtx[i].currentStage = MERGE_STAGE;
// for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
pCtx[i].param[2].i64 = pQueryInfo->order.order;
pCtx[i].param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx[i].param[1].i64 = pQueryInfo->order.orderColId;
pCtx[i].param[0].i64 = pExpr->base.param[0].i64; // top/bot parameter
} else if (functionId == TSDB_FUNC_APERCT) {
pCtx[i].param[0].i64 = pExpr->base.param[0].i64;
pCtx[i].param[0].nType = pExpr->base.param[0].nType;
} else if (functionId == TSDB_FUNC_BLKINFO) {
pCtx[i].param[0].i64 = pExpr->base.param[0].i64;
pCtx[i].param[0].nType = pExpr->base.param[0].nType;
pCtx[i].numOfParams = 1;
}
pCtx[i].interBufBytes = pExpr->base.interBytes;
pCtx[i].stableQuery = true;
}
}
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo* pQueryInfo, SLocalMerger **pMerger, int64_t id) {
if (pMemBuffer == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("pMemBuffer:%p is NULL", pMemBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscError("0x%"PRIx64" %p pMemBuffer is NULL", id, pMemBuffer);
return TSDB_CODE_TSC_APP_ERROR;
}
if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("0x%"PRIx64" no local buffer or intermediate result format model", pSql->self);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscError("0x%"PRIx64" no local buffer or intermediate result format model", id);
return TSDB_CODE_TSC_APP_ERROR;
}
int32_t numOfFlush = 0;
for (int32_t i = 0; i < numOfBuffer; ++i) {
int32_t len = pMemBuffer[i]->fileMeta.flushoutData.nLength;
if (len == 0) {
tscDebug("0x%"PRIx64" no data retrieved from orderOfVnode:%d", pSql->self, i + 1);
tscDebug("0x%"PRIx64" no data retrieved from orderOfVnode:%d", id, i + 1);
continue;
}
......@@ -137,41 +83,38 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
}
if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
tscDebug("0x%"PRIx64" retrieved no data", pSql->self);
return;
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscDebug("0x%"PRIx64" no data to retrieve", id);
return TSDB_CODE_SUCCESS;
}
if (pDesc->pColumnModel->capacity >= pMemBuffer[0]->pageSize) {
tscError("0x%"PRIx64" Invalid value of buffer capacity %d and page size %d ", pSql->self, pDesc->pColumnModel->capacity,
tscError("0x%"PRIx64" Invalid value of buffer capacity %d and page size %d ", id, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
return TSDB_CODE_TSC_APP_ERROR;
}
size_t size = sizeof(SLocalMerger) + POINTER_BYTES * numOfFlush;
SLocalMerger *pMerger = (SLocalMerger *) calloc(1, size);
if (pMerger == NULL) {
tscError("0x%"PRIx64" failed to create local merge structure, out of memory", pSql->self);
*pMerger = (SLocalMerger *) calloc(1, size);
if ((*pMerger) == NULL) {
tscError("0x%"PRIx64" failed to create local merge structure, out of memory", id);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pMerger->pExtMemBuffer = pMemBuffer;
pMerger->pLocalDataSrc = (SLocalDataSource **)&pMerger[1];
assert(pMerger->pLocalDataSrc != NULL);
(*pMerger)->pExtMemBuffer = pMemBuffer;
(*pMerger)->pLocalDataSrc = (SLocalDataSource **)&pMerger[1];
assert((*pMerger)->pLocalDataSrc != NULL);
pMerger->numOfBuffer = numOfFlush;
pMerger->numOfVnode = numOfBuffer;
(*pMerger)->numOfBuffer = numOfFlush;
(*pMerger)->numOfVnode = numOfBuffer;
pMerger->pDesc = pDesc;
tscDebug("0x%"PRIx64" the number of merged leaves is: %d", pSql->self, pMerger->numOfBuffer);
(*pMerger)->pDesc = pDesc;
tscDebug("0x%"PRIx64" the number of merged leaves is: %d", id, (*pMerger)->numOfBuffer);
int32_t idx = 0;
for (int32_t i = 0; i < numOfBuffer; ++i) {
......@@ -180,13 +123,12 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
for (int32_t j = 0; j < numOfFlushoutInFile; ++j) {
SLocalDataSource *ds = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize);
if (ds == NULL) {
tscError("0x%"PRIx64" failed to create merge structure", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("0x%"PRIx64" failed to create merge structure", id);
tfree(pMerger);
return;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pMerger->pLocalDataSrc[idx] = ds;
(*pMerger)->pLocalDataSrc[idx] = ds;
ds->pMemBuffer = pMemBuffer[i];
ds->flushoutIdx = j;
......@@ -194,7 +136,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
ds->pageId = 0;
ds->rowIdx = 0;
tscDebug("0x%"PRIx64" load data from disk into memory, orderOfVnode:%d, total:%d", pSql->self, i + 1, idx + 1);
tscDebug("0x%"PRIx64" load data from disk into memory, orderOfVnode:%d, total:%d", id, i + 1, idx + 1);
tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0);
#ifdef _DEBUG_VIEW
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num);
......@@ -208,7 +150,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
#endif
if (ds->filePage.num == 0) { // no data in this flush, the index does not increase
tscDebug("0x%"PRIx64" flush data is empty, ignore %d flush record", pSql->self, idx);
tscDebug("0x%"PRIx64" flush data is empty, ignore %d flush record", id, idx);
tfree(ds);
continue;
}
......@@ -219,115 +161,54 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
// no data actually, no need to merge result.
if (idx == 0) {
tfree(pMerger);
return;
tscDebug("0x%"PRIx64" retrieved no data", id);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
return TSDB_CODE_SUCCESS;
}
pMerger->numOfBuffer = idx;
(*pMerger)->numOfBuffer = idx;
SCompareParam *param = malloc(sizeof(SCompareParam));
if (param == NULL) {
tfree(pMerger);
return;
tfree((*pMerger));
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
param->pLocalData = pMerger->pLocalDataSrc;
param->pDesc = pMerger->pDesc;
param->num = pMerger->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
param->pLocalData = (*pMerger)->pLocalDataSrc;
param->pDesc = (*pMerger)->pDesc;
param->num = (*pMerger)->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
pMerger->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
pRes->code = tLoserTreeCreate(&pMerger->pLoserTree, pMerger->numOfBuffer, param, treeComparator);
if (pMerger->pLoserTree == NULL || pRes->code != 0) {
int32_t code = tLoserTreeCreate(&(*pMerger)->pLoserTree, (*pMerger)->numOfBuffer, param, treeComparator);
if ((*pMerger)->pLoserTree == NULL || code != TSDB_CODE_SUCCESS) {
tfree(param);
tfree(pMerger);
return;
}
// the input data format follows the old format, but output in a new format.
// so, all the input must be parsed as old format
pMerger->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx));
pMerger->rowSize = pMemBuffer[0]->nElemSize;
tscFieldInfoUpdateOffset(pQueryInfo);
if (pMerger->rowSize > pMemBuffer[0]->pageSize) {
assert(false); // todo fixed row size is larger than the minimum page size;
tfree((*pMerger));
return code;
}
// used to keep the latest input row
pMerger->pTempBuffer = (tFilePage *)calloc(1, pMerger->rowSize + sizeof(tFilePage));
pMerger->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pMerger->pResultBuf = (tFilePage *)calloc(1, pMerger->nResultBufSize + sizeof(tFilePage));
(*pMerger)->rowSize = pMemBuffer[0]->nElemSize;
pMerger->resColModel = finalmodel;
pMerger->resColModel->capacity = pMerger->nResultBufSize;
pMerger->finalModel = pFFModel;
// todo fixed row size is larger than the minimum page size;
assert((*pMerger)->rowSize <= pMemBuffer[0]->pageSize);
if (finalmodel->rowSize > 0) {
pMerger->resColModel->capacity /= finalmodel->rowSize;
}
assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize);
if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL) {
tfree(pMerger->pTempBuffer);
tfree(pMerger->pLoserTree);
if ((*pMerger)->pLoserTree == NULL) {
tfree((*pMerger)->pLoserTree);
tfree(param);
tfree(pMerger);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
}
pMerger->pTempBuffer->num = 0;
tscCreateResPointerInfo(pRes, pQueryInfo);
SSchema* pschema = calloc(pDesc->pColumnModel->numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < pDesc->pColumnModel->numOfCols; ++i) {
pschema[i] = pDesc->pColumnModel->pFields[i].field;
}
tsCreateSQLFunctionCtx(pQueryInfo, pMerger->pCtx, pschema);
// setCtxInputOutputBuffer(pQueryInfo, pMerger->pCtx, pMerger, pDesc);
tfree(pschema);
int32_t maxBufSize = 0;
for (int32_t k = 0; k < tscSqlExprNumOfExprs(pQueryInfo); ++k) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, k);
if (maxBufSize < pExpr->base.resBytes && pExpr->base.functionId == TSDB_FUNC_TAG) {
maxBufSize = pExpr->base.resBytes;
}
tfree((*pMerger));
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// we change the capacity of schema to denote that there is only one row in temp buffer
pMerger->pDesc->pColumnModel->capacity = 1;
// restore the limitation value at the last stage
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
pQueryInfo->limit.offset = pQueryInfo->prjOffset;
}
pRes->pLocalMerger = pMerger;
pRes->numOfGroups = 0;
// we change the capacity of schema to denote that there is only one row in temp buffer
(*pMerger)->pDesc->pColumnModel->capacity = 1;
// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
// STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
// TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey;
// int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision);
// if (pQueryInfo->fillType != TSDB_FILL_NONE) {
// SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
// pMerger->pFillInfo =
// taosCreateFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, 4096,
// (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding,
// pQueryInfo->interval.slidingUnit, tinfo.precision, pQueryInfo->fillType, pFillCol, pSql);
// }
return TSDB_CODE_SUCCESS;
}
static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
......@@ -418,44 +299,29 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
return 0;
}
void tscDestroyLocalMerger(SSqlObj *pSql) {
if (pSql == NULL) {
return;
}
SSqlRes *pRes = &(pSql->res);
if (pRes->pLocalMerger == NULL) {
void tscDestroyLocalMerger(SLocalMerger* pLocalMerger) {
if (pLocalMerger == NULL) {
return;
}
// there is no more result, so we release all allocated resource
SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL);
tfree(pLocalMerge->pResultBuf);
tfree(pLocalMerge->pCtx);
if (pLocalMerge->pLoserTree) {
tfree(pLocalMerge->pLoserTree->param);
tfree(pLocalMerge->pLoserTree);
if (pLocalMerger->pLoserTree) {
tfree(pLocalMerger->pLoserTree->param);
tfree(pLocalMerger->pLoserTree);
}
tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel,
pLocalMerge->finalModel, pLocalMerge->numOfVnode);
for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) {
tfree(pLocalMerge->pLocalDataSrc[i]);
tscLocalReducerEnvDestroy(pLocalMerger->pExtMemBuffer, pLocalMerger->pDesc, pLocalMerger->numOfVnode);
for (int32_t i = 0; i < pLocalMerger->numOfBuffer; ++i) {
tfree(pLocalMerger->pLocalDataSrc[i]);
}
pLocalMerge->numOfBuffer = 0;
pLocalMerge->numOfCompleted = 0;
tfree(pLocalMerge->pTempBuffer);
free(pLocalMerge);
tscDebug("0x%"PRIx64" free local reducer finished", pSql->self);
pLocalMerger->numOfBuffer = 0;
pLocalMerger->numOfCompleted = 0;
tfree(pLocalMerger->buf);
free(pLocalMerger);
}
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) {
int32_t numOfGroupByCols = 0;
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo* pQueryInfo, SColumnModel *pModel) {
int32_t numOfGroupByCols = 0;
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols;
......@@ -525,32 +391,25 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SSchema * pSchema = NULL;
int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub,
tOrderDescriptor **pOrderDesc, uint32_t nBufferSizes, int64_t id) {
SSchema *pSchema = NULL;
SColumnModel *pModel = NULL;
*pFinalModel = NULL;
SQueryInfo * pQueryInfo = tscGetActiveQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * numOfSub);
if (*pMemBuffer == NULL) {
tscError("0x%"PRIx64" failed to allocate memory", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code;
tscError("0x%"PRIx64" failed to allocate memory", id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size);
if (pSchema == NULL) {
tscError("0x%"PRIx64" failed to allocate memory", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code;
tscError("0x%"PRIx64" failed to allocate memory", id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t rlen = 0;
......@@ -570,6 +429,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
}
pModel = createColumnModel(pSchema, (int32_t)size, capacity);
tfree(pSchema);
int32_t pg = DEFAULT_PAGE_SIZE;
int32_t overhead = sizeof(tFilePage);
......@@ -577,95 +437,26 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pg *= 2;
}
size_t numOfSubs = pSql->subState.numOfSub;
assert(numOfSubs <= pTableMetaInfo->vgroupList->numOfVgroups);
for (int32_t i = 0; i < numOfSubs; ++i) {
assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups);
for (int32_t i = 0; i < numOfSub; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tfree(pSchema);
return pRes->code;
}
// final result depends on the fields number
memset(pSchema, 0, sizeof(SSchema) * size);
for (int32_t i = 0; i < size; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema p1 = {0};
if (pExpr->base.colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
p1 = *tGetTbnameColumnSchema();
} else if (TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag)) {
p1.bytes = pExpr->base.resBytes;
p1.type = (uint8_t) pExpr->base.resType;
tstrncpy(p1.name, pExpr->base.aliasName, tListLen(p1.name));
} else {
p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->base.colInfo.colIndex);
}
int32_t inter = 0;
int16_t type = -1;
int16_t bytes = 0;
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
int32_t functionId = pExpr->base.functionId;
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
type = pModel->pFields[i].field.type;
bytes = pModel->pFields[i].field.bytes;
} else {
if (functionId == TSDB_FUNC_FIRST_DST) {
functionId = TSDB_FUNC_FIRST;
} else if (functionId == TSDB_FUNC_LAST_DST) {
functionId = TSDB_FUNC_LAST;
} else if (functionId == TSDB_FUNC_STDDEV_DST) {
functionId = TSDB_FUNC_STDDEV;
}
int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false);
assert(ret == TSDB_CODE_SUCCESS);
}
pSchema[i].type = (uint8_t)type;
pSchema[i].bytes = bytes;
strcpy(pSchema[i].name, pModel->pFields[i].field.name);
}
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
memset(pSchema, 0, sizeof(SSchema) * size);
size = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < size; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
pSchema[i].bytes = pField->field.bytes;
pSchema[i].type = pField->field.type;
tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name));
if (createOrderDescriptor(pOrderDesc, pQueryInfo, pModel) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
*pFFModel = createColumnModel(pSchema, (int32_t) size, capacity);
tfree(pSchema);
return TSDB_CODE_SUCCESS;
}
/**
* @param pMemBuffer
* @param pDesc
* @param pFinalModel
* @param numOfVnodes
*/
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel,
int32_t numOfVnodes) {
destroyColumnModel(pFinalModel);
destroyColumnModel(pFFModel);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes) {
tOrderDescDestroy(pDesc);
for (int32_t i = 0; i < numOfVnodes; ++i) {
pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]);
}
......@@ -877,10 +668,12 @@ static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) {
return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted);
}
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) {
SSqlRes *pRes = &pObj->res;
void tscInitResObjForLocalQuery(SSqlObj *pSql, int32_t numOfRes, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
if (pRes->pLocalMerger != NULL) {
tscDestroyLocalMerger(pObj);
tscDestroyLocalMerger(pRes->pLocalMerger);
pRes->pLocalMerger = NULL;
tscDebug("0x%"PRIx64" free local reducer finished", pSql->self);
}
pRes->qId = 1; // hack to pass the safety check in fetch_row function
......@@ -891,14 +684,12 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->pLocalMerger = (SLocalMerger *)calloc(1, sizeof(SLocalMerger));
/*
* we need one additional byte space
* the sprintf function needs one additional space to put '\0' at the end of string
* One more byte space is required, since the sprintf function needs one additional space to put '\0' at
* the end of string
*/
size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1;
pRes->pLocalMerger->pResultBuf = (tFilePage *)calloc(1, allocSize);
pRes->pLocalMerger->pResultBuf->num = numOfRes;
pRes->data = pRes->pLocalMerger->pResultBuf->data;
size_t size = numOfRes * rowLen + 1;
pRes->pLocalMerger->buf = calloc(1, size);
pRes->data = pRes->pLocalMerger->buf;
}
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
......@@ -944,8 +735,8 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
return offset;
}
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
#define COLMODEL_GET_VAL(data, schema, rowId, colId) \
(data + (schema)->pFields[colId].offset * ((schema)->capacity) + (rowId) * (schema)->pFields[colId].field.bytes)
static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel *pModel, int32_t rowIndex,
int32_t maxRows) {
......@@ -953,7 +744,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
char* p = pColInfo->pData + pBlock->info.rows * pColInfo->info.bytes;
char *src = COLMODEL_GET_VAL(buf, pModel, maxRows, rowIndex, i);
char *src = COLMODEL_GET_VAL(buf, pModel, rowIndex, i);
memmove(p, src, pColInfo->info.bytes);
}
......@@ -970,8 +761,6 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SLocalMerger *pMerger = pInfo->pMerge;
SLoserTreeInfo *pTree = pMerger->pLoserTree;
SColumnModel *pModel = pMerger->pDesc->pColumnModel;
tFilePage *tmpBuffer = pMerger->pTempBuffer;
pInfo->binfo.pRes->info.rows = 0;
......@@ -984,7 +773,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
#endif
assert((pTree->pNode[0].index < pMerger->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0);
assert((pTree->pNode[0].index < pMerger->numOfBuffer) && (pTree->pNode[0].index >= 0));
// chosen from loser tree
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
......@@ -998,7 +787,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *newRow =
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity,
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel,
pOneDataSrc->rowIdx, pIndex->colIndex);
char * data = pInfo->prevRow[i];
......@@ -1021,7 +810,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SColumnInfoData *pColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *curCol =
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pModel, pOneDataSrc->pMemBuffer->pColumnModel->capacity,
COLMODEL_GET_VAL(pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel,
pOneDataSrc->rowIdx, pIndex->colIndex);
memcpy(pInfo->prevRow[i], curCol, pColInfo->info.bytes);
}
......@@ -1033,7 +822,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
return pInfo->binfo.pRes;
}
appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pModel, pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity);
appendOneRowToDataBlock(pInfo->binfo.pRes, pOneDataSrc->filePage.data, pOneDataSrc->pMemBuffer->pColumnModel,
pOneDataSrc->rowIdx, pOneDataSrc->pMemBuffer->pColumnModel->capacity);
#if defined(_DEBUG_VIEW)
printf("chosen row:\t");
......
......@@ -1615,6 +1615,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
return code;
}
if (pRes->pLocalMerger == NULL) { // no result from subquery, so abort here directly.
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
return code;
}
// global aggregation may be the upstream for parent query
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
if (pQueryInfo->pQInfo == NULL) {
......
......@@ -2422,8 +2422,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tExtMemBuffer **pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL;
SColumnModel *pFinalModel = NULL;
pRes->qId = 0x1; // hack the qhandle check
......@@ -2442,9 +2440,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
int32_t ret = tscLocalReducerEnvCreate(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
pRes->code = ret;
tscAsyncResultOnError(pSql);
tfree(pMemoryBuf);
return ret;
......@@ -2455,7 +2453,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if (pSql->pSubs == NULL) {
tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc,pState->numOfSub);
tscAsyncResultOnError(pSql);
return ret;
......@@ -2498,8 +2496,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i;
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
trs->pFFColModel = pFinalModel;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
......@@ -2524,13 +2520,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
......@@ -2695,7 +2691,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror(pParentSql->res.code));
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor,
pState->numOfSub);
tscFreeRetrieveSup(pSql);
......@@ -2770,19 +2766,25 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
code = tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pLocalMerger, pParentSql->self);
pParentSql->res.code = code;
if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
} else {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
}
tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo);
tscDebug("0x%"PRIx64" build loser tree completed", pParentSql->self);
pParentSql->res.precision = pSql->res.precision;
pParentSql->res.numOfRows = 0;
pParentSql->res.row = 0;
tscFreeRetrieveSup(pSql);
pParentSql->res.numOfGroups = 0;
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
}
tscFreeRetrieveSup(pSql);
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
......
......@@ -828,11 +828,10 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
}
void tscFreeSqlResult(SSqlObj* pSql) {
tscDestroyLocalMerger(pSql);
SSqlRes* pRes = &pSql->res;
tscDestroyResPointerInfo(pRes);
tscDestroyLocalMerger(pRes->pLocalMerger);
tscDestroyResPointerInfo(pRes);
memset(&pSql->res, 0, sizeof(SSqlRes));
}
......
......@@ -354,7 +354,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qId, *handle);
vWarn("vgId:%d, QInfo:%"PRIx64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qId, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......
......@@ -5,6 +5,8 @@ IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo apitest.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
ADD_EXECUTABLE(subscribe subscribe.c)
TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread )
ENDIF ()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册