提交 11c0f8ba 编写于 作者: H hjLiao

[td-225] fix bugs at client

上级 cf06a3c8
......@@ -116,7 +116,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
* create local reducer to launch the second-stage reduce process at client site
*/
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SSqlCmd *pSqlCmd, SSqlRes *pRes);
SColumnModel *finalModel, SSqlObj* pSql);
void tscDestroyLocalReducer(SSqlObj *pSql);
......
......@@ -55,7 +55,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
}
static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pReducer, tOrderDescriptor *pDesc) {
static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDescriptor *pDesc) {
/*
* the fields and offset attributes in pCmd and pModel may be different due to
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
......@@ -96,13 +96,13 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf;
pCtx->param[2].i64Key = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
}
SResultInfo *pResInfo = &pReducer->pResInfo[i];
pResInfo->bufLen = pExpr->interBytes;
pResInfo->interResultBuf = calloc(1, (size_t)pResInfo->bufLen);
pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen);
pCtx->resultInfo = &pReducer->pResInfo[i];
pCtx->resultInfo->superTableQ = true;
......@@ -132,16 +132,15 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
}
}
/*
* todo release allocated memory process with async process
*/
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SSqlCmd *pCmd, SSqlRes *pRes) {
// offset of cmd in SSqlObj structure
char *pSqlObjAddr = (char *)pCmd - offsetof(SSqlObj, cmd);
SColumnModel *finalmodel, SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
if (pMemBuffer == NULL) {
tscError("%p pMemBuffer", pMemBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p pMemBuffer is NULL", pMemBuffer);
pRes->code = TSDB_CODE_APP_ERROR;
return;
}
......@@ -149,7 +148,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscError("%p no local buffer or intermediate result format model", pSqlObjAddr);
tscError("%p no local buffer or intermediate result format model", pSql);
pRes->code = TSDB_CODE_APP_ERROR;
return;
}
......@@ -158,7 +157,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
for (int32_t i = 0; i < numOfBuffer; ++i) {
int32_t len = pMemBuffer[i]->fileMeta.flushoutData.nLength;
if (len == 0) {
tscTrace("%p no data retrieved from orderOfVnode:%d", pSqlObjAddr, i + 1);
tscTrace("%p no data retrieved from orderOfVnode:%d", pSql, i + 1);
continue;
}
......@@ -167,13 +166,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscTrace("%p retrieved no data", pSqlObjAddr);
tscTrace("%p retrieved no data", pSql);
return;
}
if (pDesc->pColumnModel->capacity >= pMemBuffer[0]->pageSize) {
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSqlObjAddr, pDesc->pColumnModel->capacity,
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
......@@ -181,10 +180,11 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
return;
}
size_t nReducerSize = sizeof(SLocalReducer) + sizeof(void *) * numOfFlush;
SLocalReducer *pReducer = (SLocalReducer *)calloc(1, nReducerSize);
size_t size = sizeof(SLocalReducer) + POINTER_BYTES * numOfFlush;
SLocalReducer *pReducer = (SLocalReducer *) calloc(1, size);
if (pReducer == NULL) {
tscError("%p failed to create merge structure", pSqlObjAddr);
tscError("%p failed to create local merge structure, out of memory", pSql);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -199,48 +199,52 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->numOfVnode = numOfBuffer;
pReducer->pDesc = pDesc;
tscTrace("%p the number of merged leaves is: %d", pSqlObjAddr, pReducer->numOfBuffer);
tscTrace("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
int32_t idx = 0;
for (int32_t i = 0; i < numOfBuffer; ++i) {
int32_t numOfFlushoutInFile = pMemBuffer[i]->fileMeta.flushoutData.nLength;
for (int32_t j = 0; j < numOfFlushoutInFile; ++j) {
SLocalDataSource *pDS = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize);
if (pDS == NULL) {
tscError("%p failed to create merge structure", pSqlObjAddr);
SLocalDataSource *ds = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize);
if (ds == NULL) {
tscError("%p failed to create merge structure", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return;
}
pReducer->pLocalDataSrc[idx] = pDS;
pReducer->pLocalDataSrc[idx] = ds;
pDS->pMemBuffer = pMemBuffer[i];
pDS->flushoutIdx = j;
pDS->filePage.numOfElems = 0;
pDS->pageId = 0;
pDS->rowIdx = 0;
ds->pMemBuffer = pMemBuffer[i];
ds->flushoutIdx = j;
ds->filePage.numOfElems = 0;
ds->pageId = 0;
ds->rowIdx = 0;
tscTrace("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSqlObjAddr, i + 1, idx + 1);
tExtMemBufferLoadData(pMemBuffer[i], &(pDS->filePage), j, 0);
tscTrace("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSql, i + 1, idx + 1);
tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0);
#ifdef _DEBUG_VIEW
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", pDS->filePage.numOfElems);
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.numOfElems);
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, pDS->filePage.data, pDS->filePage.numOfElems,
tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.numOfElems,
pMemBuffer[0]->numOfElemsPerPage, colInfo);
#endif
if (pDS->filePage.numOfElems == 0) { // no data in this flush
tscTrace("%p flush data is empty, ignore %d flush record", pSqlObjAddr, idx);
tfree(pDS);
if (ds->filePage.numOfElems == 0) { // no data in this flush, the index does not increase
tscTrace("%p flush data is empty, ignore %d flush record", pSql, idx);
tfree(ds);
continue;
}
idx += 1;
}
}
assert(idx >= pReducer->numOfBuffer);
// no data actually, no need to merge result.
if (idx == 0) {
return;
}
......@@ -262,9 +266,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// the input data format follows the old format, but output in a new format.
// so, all the input must be parsed as old format
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pCtx = (SQLFunctionCtx *)calloc(size, sizeof(SQLFunctionCtx));
pReducer->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx));
pReducer->rowSize = pMemBuffer[0]->nElemSize;
tscRestoreSQLFuncForSTableQuery(pQueryInfo);
......@@ -313,7 +315,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->pResInfo = calloc(size, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pRes, pReducer, pDesc);
tscInitSqlContext(pCmd, pReducer, pDesc);
// we change the capacity of schema to denote that there is only one row in temp buffer
pReducer->pDesc->pColumnModel->capacity = 1;
......@@ -428,8 +430,7 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows);
if (pPage->numOfElems == pModel->capacity) {
int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType);
if (ret != 0) {
if (tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType) != TSDB_CODE_SUCCESS) {
return -1;
}
} else {
......
......@@ -1533,8 +1533,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
&pPObj->cmd, &pPObj->res);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pPObj);
tscTrace("%p build loser tree completed", pPObj);
pPObj->res.precision = pSql->res.precision;
......
......@@ -120,12 +120,6 @@ typedef struct tExtMemBuffer {
EXT_BUFFER_FLUSH_MODEL flushModel;
} tExtMemBuffer;
//typedef struct tTagSchema {
// struct SSchema *pSchema;
// int32_t numOfCols;
// int32_t colOffset[];
//} tTagSchema;
/**
*
* @param inMemSize
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册