提交 f232dd2a 编写于 作者: H hjxilinx

support the arithmetic calculation of aggregation between different super table in join query.

上级 75b497d3
...@@ -251,7 +251,13 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); ...@@ -251,7 +251,13 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
typedef struct SColumnList {
int32_t num;
SColumnIndex ids[TSDB_MAX_COLUMNS];
} SColumnList;
int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
int8_t type, char* fieldName, SSqlExpr* pSqlExpr);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -685,9 +685,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -685,9 +685,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->exprsInfo.numOfExprs);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t tableIndexOfSub = -1; int32_t tableIndexOfSub = -1;
......
...@@ -38,11 +38,6 @@ ...@@ -38,11 +38,6 @@
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX)) #define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
#define TBNAME_LIST_SEP "," #define TBNAME_LIST_SEP ","
typedef struct SColumnList {
int32_t num;
SColumnIndex ids[TSDB_MAX_COLUMNS];
} SColumnList;
static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIdx, int32_t tableIndex); static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIdx, int32_t tableIndex);
static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo);
...@@ -60,8 +55,6 @@ static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t na ...@@ -60,8 +55,6 @@ static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t na
static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName); static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName);
static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool isResultColumn); static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool isResultColumn);
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
int8_t type, char* fieldName, SSqlExpr* pSqlExpr);
static int32_t changeFunctionID(int32_t optr, int16_t* functionId); static int32_t changeFunctionID(int32_t optr, int16_t* functionId);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable); static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable);
......
...@@ -694,13 +694,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu ...@@ -694,13 +694,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
tscPrintSelectClause(pNew, 0);
} else { } else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
......
...@@ -74,7 +74,10 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -74,7 +74,10 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
tscMgmtIpList.ip[2] = inet_addr(tsMasterIp); tscMgmtIpList.ip[2] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipstr[3], tsSecondIp); strcpy(tscMgmtIpList.ipstr[3], tsSecondIp);
tscMgmtIpList.ip[3] = inet_addr(tsSecondIp); tscMgmtIpList.ip[3] = inet_addr(tsSecondIp);
strcpy(tsMasterIp, ip);
if (tsMasterIp != ip) {
strcpy(tsMasterIp, ip);
}
} }
pObj = (STscObj *)malloc(sizeof(STscObj)); pObj = (STscObj *)malloc(sizeof(STscObj));
...@@ -416,7 +419,7 @@ static char *getArithemicInputSrc(void *param, char *name, int32_t colId) { ...@@ -416,7 +419,7 @@ static char *getArithemicInputSrc(void *param, char *name, int32_t colId) {
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index]; return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
} }
static void **doSetResultRowData(SSqlObj *pSql) { static void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -429,7 +432,6 @@ static void **doSetResultRowData(SSqlObj *pSql) { ...@@ -429,7 +432,6 @@ static void **doSetResultRowData(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int32_t num = 0;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) { for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) { if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) {
SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i]; SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
...@@ -444,7 +446,7 @@ static void **doSetResultRowData(SSqlObj *pSql) { ...@@ -444,7 +446,7 @@ static void **doSetResultRowData(SSqlObj *pSql) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
transferNcharData(pSql, i, pField); transferNcharData(pSql, i, pField);
// calculate the result from serveral other columns // calculate the result from several other columns
if (pQueryInfo->fieldsInfo.pExpr != NULL && pQueryInfo->fieldsInfo.pExpr[i] != NULL) { if (pQueryInfo->fieldsInfo.pExpr != NULL && pQueryInfo->fieldsInfo.pExpr[i] != NULL) {
SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport)); SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
sas->offset = 0; sas->offset = 0;
...@@ -471,8 +473,6 @@ static void **doSetResultRowData(SSqlObj *pSql) { ...@@ -471,8 +473,6 @@ static void **doSetResultRowData(SSqlObj *pSql) {
} }
} }
assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols);
pRes->row++; // index increase one-step pRes->row++; // index increase one-step
return pRes->tsrow; return pRes->tsrow;
} }
...@@ -536,9 +536,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -536,9 +536,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
while (1) { while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
if (pRes->tsrow == NULL) { tscCreateResPointerInfo(pRes, pQueryInfo);
pRes->tsrow = calloc(pQueryInfo->exprsInfo.numOfExprs, POINTER_BYTES);
}
bool success = false; bool success = false;
...@@ -550,10 +548,8 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -550,10 +548,8 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
} }
if (numOfTableHasRes >= 2) { // do merge result if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) &&
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); (doSetResultRowData(pSql->pSubs[1], false) != NULL);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
} else { // only one subquery } else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0]; SSqlObj *pSub = pSql->pSubs[0];
...@@ -561,7 +557,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -561,7 +557,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
pSub = pSql->pSubs[1]; pSub = pSql->pSubs[1];
} }
success = (doSetResultRowData(pSub) != NULL); success = (doSetResultRowData(pSub, false) != NULL);
} }
if (success) { // current row of final output has been built, return to app if (success) { // current row of final output has been built, return to app
...@@ -572,6 +568,41 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -572,6 +568,41 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes1 = &pSql->pSubs[tableIndex]->res; SSqlRes *pRes1 = &pSql->pSubs[tableIndex]->res;
pRes->tsrow[i] = pRes1->tsrow[columnIndex]; pRes->tsrow[i] = pRes1->tsrow[columnIndex];
} }
int32_t numOfOutputCols = tscNumOfFields(pQueryInfo);
assert(pRes->numOfCols >= numOfOutputCols);
for(int32_t i = 0; i < numOfOutputCols; ++i) {
if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) {
continue; // no arithmetic expression exists, continue
}
assert(pQueryInfo->fieldsInfo.pExpr[i] != NULL);
SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
sas->offset = 0;
sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i];
sas->numOfCols = sas->pExpr->binExprInfo.numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
}
for (int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIdxInBuf;
assert(columnIndex < pQueryInfo->exprsInfo.numOfExprs);
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = pRes->tsrow[columnIndex];
}
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC,
getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); // todo optimization
}
pRes->numOfTotalInCurrentClause++; pRes->numOfTotalInCurrentClause++;
...@@ -662,7 +693,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { ...@@ -662,7 +693,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
} }
} }
return doSetResultRowData(pSql); return doSetResultRowData(pSql, true);
} }
TAOS_ROW taos_fetch_row(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) {
......
...@@ -350,11 +350,13 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) { ...@@ -350,11 +350,13 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pRes->tsrow == NULL) { if (pRes->tsrow == NULL) {
int32_t numOfOutputCols = pQueryInfo->fieldsInfo.numOfOutputCols; int32_t numOfColumns = pQueryInfo->exprsInfo.numOfExprs;
pRes->numOfCols = numOfOutputCols; assert(numOfColumns >= pQueryInfo->fieldsInfo.numOfOutputCols);
pRes->numOfCols = numOfColumns;
pRes->tsrow = calloc(POINTER_BYTES, numOfOutputCols); pRes->tsrow = calloc(POINTER_BYTES, numOfColumns);
pRes->buffer = calloc(POINTER_BYTES, numOfOutputCols); pRes->buffer = calloc(POINTER_BYTES, numOfColumns);
// not enough memory // not enough memory
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) { if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
...@@ -370,8 +372,8 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -370,8 +372,8 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
} }
void tscDestroyResPointerInfo(SSqlRes* pRes) { void tscDestroyResPointerInfo(SSqlRes* pRes) {
// free all buffers containing the multibyte string
if (pRes->buffer != NULL) { if (pRes->buffer != NULL) {
// free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfCols; i++) { for (int i = 0; i < pRes->numOfCols; i++) {
tfree(pRes->buffer[i]); tfree(pRes->buffer[i]);
} }
...@@ -946,6 +948,7 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList ...@@ -946,6 +948,7 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList
tscFieldInfoSetValFromField(dst, i, &src->pFields[indexList[i]]); tscFieldInfoSetValFromField(dst, i, &src->pFields[indexList[i]]);
dst->pVisibleCols[i] = src->pVisibleCols[indexList[i]]; dst->pVisibleCols[i] = src->pVisibleCols[indexList[i]];
dst->pSqlExpr[i] = src->pSqlExpr[indexList[i]]; dst->pSqlExpr[i] = src->pSqlExpr[indexList[i]];
dst->pExpr[i] = src->pExpr[indexList[i]];
} }
} }
} }
...@@ -2004,8 +2007,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2004,8 +2007,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
indexList[j++] = i; indexList[j++] = i;
} }
} }
// create the fields info from the sql functions
SColumnList columnList = {.num = 1};
for(int32_t k = 0; k < numOfOutputCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, indexList[k]);
columnList.ids[0] = (SColumnIndex){.tableIndex = tableIndex, .columnIndex = pExpr->colInfo.colIdx};
insertResultField(pNewQueryInfo, k, &columnList, pExpr->resBytes, pExpr->resType, pExpr->aliasName, pExpr);
}
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pNewQueryInfo->fieldsInfo, indexList, numOfOutputCols);
free(indexList); free(indexList);
// make sure the the sqlExpr for each fields is correct // make sure the the sqlExpr for each fields is correct
......
...@@ -1776,6 +1776,11 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow ...@@ -1776,6 +1776,11 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
while (1) { while (1) {
if ((pNextWin->ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pNextWin->skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
return -1;
}
getNextTimeWindow(pQuery, pNextWin); getNextTimeWindow(pQuery, pNextWin);
if (pWindowResInfo->startTime > pNextWin->skey || (pNextWin->skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || if (pWindowResInfo->startTime > pNextWin->skey || (pNextWin->skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册