提交 a33e3062 编写于 作者: H Haojun Liao

[TD-225]

上级 88e6b831
......@@ -84,7 +84,7 @@ typedef struct SRetrieveSupport {
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
SColumnModel **pFinalModel, uint32_t nBufferSize);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
int32_t numOfVnodes);
......@@ -98,7 +98,7 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
* create local reducer to launch the second-stage reduce process at client site
*/
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SSqlObj* pSql);
SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql);
void tscDestroyLocalReducer(SSqlObj *pSql);
......
......@@ -30,7 +30,7 @@ typedef struct SCompareParam {
int32_t groupOrderType;
} SCompareParam;
static void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* pBuf, char* src, int32_t numOfRows);
static void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
......@@ -169,7 +169,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
}
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SSqlObj* pSql) {
SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -505,7 +505,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int32_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
tVariantDestroy(&pCtx->tag);
......@@ -569,7 +570,8 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
if (numOfGroupByCols > 0) {
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
int32_t numOfInternalOutput = tscSqlExprNumOfExprs(pQueryInfo);
int32_t startCols = numOfInternalOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
// the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
......@@ -655,7 +657,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
SColumnModel **pFinalModel, SColumnModel **pFFModel, uint32_t nBufferSizes) {
SColumnModel **pFinalModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -688,6 +690,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema[i].bytes = pExpr->resBytes;
pSchema[i].type = (int8_t)pExpr->resType;
tstrncpy(pSchema[i].name, pExpr->aliasName, tListLen(pSchema[i].name));
rlen += pExpr->resBytes;
}
......@@ -751,18 +755,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
TAOS_FIELD* pField = &pIField->field;
pSchema[i].type = pField->type;
pSchema[i].bytes = pField->bytes;
strncpy(pSchema[i].name, pField->name, tListLen(pField->name));
}
*pFFModel = createColumnModel(pSchema, pQueryInfo->fieldsInfo.numOfOutput, capacity);
tfree(pSchema);
return TSDB_CODE_SUCCESS;
}
......@@ -1249,11 +1242,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
char* pbuf = calloc(1,pResBuf->num * pModel->rowSize);
doArithmeticCalculate(pQueryInfo, pbuf, pResBuf->data, (int32_t)pResBuf->num);
memcpy(pResBuf->data, pbuf, pResBuf->num * pModel->rowSize);
free(pbuf);
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
}
#ifdef _DEBUG_VIEW
......@@ -1623,11 +1612,23 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->data = pRes->pLocalReducer->pResultBuf->data;
}
void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* outputBuf, char* src, int32_t numOfRows) {
void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
char* pbuf = calloc(1, pOutput->num * rowSize);
size_t size = tscNumOfFields(pQueryInfo);
SArithmeticSupport* pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport));
SArithmeticSupport arithSup = {0};
// todo refactor
arithSup.offset = 0;
arithSup.numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
arithSup.exprList = pQueryInfo->exprList;
arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES);
for(int32_t k = 0; k < arithSup.numOfCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
arithSup.data[k] = (pOutput->data + pOutput->num* pExpr->offset);
}
int32_t rowIndex = 0;
int32_t offset = 0;
for (int i = 0; i < size; ++i) {
......@@ -1635,25 +1636,19 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, char* outputBuf, char* src, i
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
// todo refactor
pArithSup->offset = 0;
pArithSup->pArithExpr = pSup->pArithExprInfo;
pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
pArithSup->exprList = pQueryInfo->exprList;
pArithSup->data = calloc(pArithSup->numOfCols, POINTER_BYTES);
for(int32_t k = 0; k < pArithSup->numOfCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
pArithSup->data[k] = (src + numOfRows* pExpr->offset) + rowIndex*pExpr->resBytes;
}
tExprTreeCalcTraverse(pArithSup->pArithExpr->pExpr, numOfRows, outputBuf + numOfRows*offset, pArithSup,
TSDB_ORDER_ASC, getArithemicInputSrc);
arithSup.pArithExpr = pSup->pArithExprInfo;
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithemicInputSrc);
} else {
SSqlExpr* pExpr = pSup->pSqlExpr;
memcpy(outputBuf + numOfRows * offset, pExpr->offset * numOfRows + src, pExpr->resBytes * numOfRows);
memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, pExpr->resBytes * pOutput->num);
}
offset += pSup->field.bytes;
}
assert(finalRowSize <= rowSize);
memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize);
tfree(pbuf);
tfree(arithSup.data);
}
\ No newline at end of file
......@@ -1410,10 +1410,12 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
// TODO: other error handling
} END_TRY
pInfo->pArithExprInfo->base.arg[0].argBytes = (int16_t) tbufTell(&bw);
pInfo->pArithExprInfo->base.arg[0].argValue.pz = tbufGetData(&bw, true);
pInfo->pArithExprInfo->base.arg[0].argType = TSDB_DATA_TYPE_BINARY;
tbufCloseWriter(&bw);
SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base;
pFuncMsg->arg[0].argBytes = (int16_t) tbufTell(&bw);
pFuncMsg->arg[0].argValue.pz = tbufGetData(&bw, true);
pFuncMsg->arg[0].argType = TSDB_DATA_TYPE_BINARY;
// tbufCloseWriter(&bw); // TODO there is a memory leak
}
return TSDB_CODE_SUCCESS;
......
......@@ -1644,7 +1644,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL;
SColumnModel *pFFModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check
......@@ -1663,7 +1662,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFFModel, nBufferSize);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
......@@ -1708,8 +1707,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i;
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
trs->pFFColModel = pFFModel;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
......@@ -1764,10 +1762,6 @@ static void tscFreeRetrieveSup(SSqlObj *pSql) {
}
tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport);
// int32_t index = trsupport->subqueryIndex;
// SSqlObj *pParentSql = trsupport->pParentSql;
// assert(pSql == pParentSql->pSubs[index]);
tfree(trsupport->localBuffer);
tfree(trsupport);
}
......@@ -1958,7 +1952,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
tscDebug("%p build loser tree completed", pParentSql);
pParentSql->res.precision = pSql->res.precision;
......@@ -2467,33 +2461,6 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
transferNcharData(pSql, i, pField);
}
// calculate the result from several other columns
// if (pSup->pArithExprInfo != NULL) {
// if (pRes->pArithSup == NULL) {
// pRes->pArithSup = (SArithmeticSupport*)calloc(1, sizeof(SArithmeticSupport));
// }
//
// pRes->pArithSup->offset = 0;
// pRes->pArithSup->pArithExpr = pSup->pArithExprInfo;
// pRes->pArithSup->numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
// pRes->pArithSup->exprList = pQueryInfo->exprList;
// pRes->pArithSup->data = calloc(pRes->pArithSup->numOfCols, POINTER_BYTES);
//
// if (pRes->buffer[i] == NULL) {
// TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
// pRes->buffer[i] = malloc(field->bytes);
// }
//
// for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) {
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
// pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
// }
//
// tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
// TSDB_ORDER_ASC, getArithemicInputSrc);
// pRes->tsrow[i] = (unsigned char*)pRes->buffer[i];
// }
}
pRes->row++; // index increase one-step
......
......@@ -945,6 +945,14 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
if (pInfo->pArithExprInfo != NULL) {
tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base;
for(int32_t j = 0; j < pFuncMsg->numOfParams; ++j) {
if (pFuncMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) {
tfree(pFuncMsg->arg[j].argValue.pz);
}
}
tfree(pInfo->pArithExprInfo);
}
}
......
......@@ -5336,7 +5336,7 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId)
}
static void doSecondaryArithmeticProcess(SQuery* pQuery) {
SArithmeticSupport *pArithSup = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
SArithmeticSupport arithSup = {0};
tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
......@@ -5344,13 +5344,13 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
data[i] = (tFilePage *)malloc(bytes * pQuery->rec.rows + sizeof(tFilePage));
}
pArithSup->offset = 0;
pArithSup->numOfCols = (int32_t)pQuery->numOfOutput;
pArithSup->exprList = pQuery->pExpr1;
pArithSup->data = calloc(pArithSup->numOfCols, POINTER_BYTES);
arithSup.offset = 0;
arithSup.numOfCols = (int32_t)pQuery->numOfOutput;
arithSup.exprList = pQuery->pExpr1;
arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES);
for (int32_t k = 0; k < pArithSup->numOfCols; ++k) {
pArithSup->data[k] = pQuery->sdata[k]->data;
for (int32_t k = 0; k < arithSup.numOfCols; ++k) {
arithSup.data[k] = pQuery->sdata[k]->data;
}
for (int i = 0; i < pQuery->numOfExpr2; ++i) {
......@@ -5368,8 +5368,8 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
}
}
} else {
pArithSup->pArithExpr = pExpr;
tExprTreeCalcTraverse(pArithSup->pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, pArithSup, TSDB_ORDER_ASC,
arithSup.pArithExpr = pExpr;
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup, TSDB_ORDER_ASC,
getArithemicInputSrc);
}
}
......@@ -5378,7 +5378,12 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
memcpy(pQuery->sdata[i]->data, data[i]->data, pQuery->pExpr2[i].bytes * pQuery->rec.rows);
}
tfree(pArithSup);
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
tfree(data[i]);
}
tfree(data);
tfree(arithSup.data);
}
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册