提交 5f55bf25 编写于 作者: S Shengliang Guan

Merge from develop

...@@ -28,7 +28,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql); ...@@ -28,7 +28,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index);
void tscHandleMasterJoinQuery(SSqlObj* pSql); void tscHandleMasterJoinQuery(SSqlObj* pSql);
......
...@@ -109,7 +109,6 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint ...@@ -109,7 +109,6 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
void* tscDestroyBlockArrayList(SArray* pDataBlockList); void* tscDestroyBlockArrayList(SArray* pDataBlockList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SArray* pDataBlockList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
......
...@@ -554,27 +554,48 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm ...@@ -554,27 +554,48 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
numOfGroupByCols++; numOfGroupByCols++;
} }
int32_t *orderIdx = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t)); int32_t *orderColIndexList = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t));
if (orderIdx == NULL) { if (orderColIndexList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
if (numOfGroupByCols > 0) { if (numOfGroupByCols > 0) {
int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
// tags value locate at the last columns if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
orderIdx[i] = startCols++;
} // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
orderColIndexList[i] = startCols++;
}
if (pQueryInfo->interval.interval != 0) {
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderColIndexList[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX; //TODO ???
}
} else {
/*
* 1. the orderby ts asc/desc projection query for the super table
* 2. interval query without groupby clause
*/
if (pQueryInfo->interval.interval != 0) {
orderColIndexList[0] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
orderColIndexList[0] = i;
}
}
}
if (pQueryInfo->interval.interval != 0) { assert(pQueryInfo->order.orderColId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
// the first column is the timestamp, handles queries like "interval(10m) group by tags"
orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} }
} }
*pOrderDesc = tOrderDesCreate(orderIdx, numOfGroupByCols, pModel, pQueryInfo->order.order); *pOrderDesc = tOrderDesCreate(orderColIndexList, numOfGroupByCols, pModel, pQueryInfo->order.order);
taosTFree(orderIdx); taosTFree(orderColIndexList);
if (*pOrderDesc == NULL) { if (*pOrderDesc == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -588,7 +609,6 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage ...@@ -588,7 +609,6 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
// disable merge procedure for column projection query // disable merge procedure for column projection query
int16_t functionId = pReducer->pCtx[0].functionId; int16_t functionId = pReducer->pCtx[0].functionId;
assert(functionId != TSDB_FUNC_ARITHM);
if (pReducer->orderPrjOnSTable) { if (pReducer->orderPrjOnSTable) {
return true; return true;
} }
...@@ -606,7 +626,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage ...@@ -606,7 +626,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
return true; return true;
} }
if (orderInfo->pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (orderInfo->colIndex[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
/* /*
* super table interval query * super table interval query
* if the order columns is the primary timestamp, all result data belongs to one group * if the order columns is the primary timestamp, all result data belongs to one group
...@@ -620,7 +640,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage ...@@ -620,7 +640,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
} }
// only one row exists // only one row exists
int32_t index = orderInfo->pData[0]; int32_t index = orderInfo->colIndex[0];
int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset; int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset); int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
...@@ -661,7 +681,6 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -661,7 +681,6 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema[i].bytes = pExpr->resBytes; pSchema[i].bytes = pExpr->resBytes;
pSchema[i].type = (int8_t)pExpr->resType; pSchema[i].type = (int8_t)pExpr->resType;
rlen += pExpr->resBytes; rlen += pExpr->resBytes;
} }
...@@ -701,12 +720,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -701,12 +720,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
int16_t type = -1; int16_t type = -1;
int16_t bytes = 0; int16_t bytes = 0;
// if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
// (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) ||
// pExpr->functionId == TSDB_FUNC_LAST_ROW) {
// the final result size and type in the same as query on single table. // the final result size and type in the same as query on single table.
// so here, set the flag to be false; // so here, set the flag to be false;
int32_t functionId = pExpr->functionId; int32_t functionId = pExpr->functionId;
if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) { if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
type = pModel->pFields[i].field.type; type = pModel->pFields[i].field.type;
......
...@@ -626,6 +626,11 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ ...@@ -626,6 +626,11 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// orderby column not set yet, set it to be the primary timestamp column
if (pQueryInfo->order.orderColId == INT32_MIN) {
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
// interval is not null // interval is not null
SStrToken* t = &pQuerySql->interval; SStrToken* t = &pQuerySql->interval;
if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) { if (parseNatualDuration(t->z, t->n, &pQueryInfo->interval.interval, &pQueryInfo->interval.intervalUnit) != TSDB_CODE_SUCCESS) {
...@@ -1347,6 +1352,32 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn ...@@ -1347,6 +1352,32 @@ static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumn
insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, (int8_t)pExpr->resType, pExpr->aliasName, pExpr); insertResultField(pQueryInfo, startPos, &ids, pExpr->resBytes, (int8_t)pExpr->resType, pExpr->aliasName, pExpr);
} }
static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
// primary timestamp column has been added already
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return;
}
}
SColumnIndex index = {0};
// set the constant column value always attached to first table.
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, PRIMARYKEY_TIMESTAMP_COL_INDEX);
// add the timestamp column into the output columns
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL);
SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols);
pSupInfo->visible = false;
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery) { int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery) {
assert(pSelection != NULL && pCmd != NULL); assert(pSelection != NULL && pCmd != NULL);
...@@ -1400,20 +1431,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1400,20 +1431,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
// there is only one user-defined column in the final result field, add the timestamp column. // there is only one user-defined column in the final result field, add the timestamp column.
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) { if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
SColumnIndex index = {0}; addPrimaryTsColIntoResult(pQueryInfo);
// set the constant column value always attached to first table.
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, clauseIndex, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, PRIMARYKEY_TIMESTAMP_COL_INDEX);
// add the timestamp column into the output columns
int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tscAddSpecialColumnForSelect(pQueryInfo, numOfCols, TSDB_FUNC_PRJ, &index, pSchema, TSDB_COL_NORMAL);
SFieldSupInfo* pSupInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, numOfCols);
pSupInfo->visible = false;
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
} }
if (!functionCompatibleCheck(pQueryInfo, joinQuery)) { if (!functionCompatibleCheck(pQueryInfo, joinQuery)) {
...@@ -4371,14 +4389,13 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery ...@@ -4371,14 +4389,13 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
/* set default timestamp order information for all queries */ /* set default timestamp order information for all queries */
pQueryInfo->order.order = TSDB_ORDER_ASC;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
if (isTopBottomQuery(pQueryInfo)) { if (isTopBottomQuery(pQueryInfo)) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else { } else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column
pQueryInfo->order.orderColId = -1; pQueryInfo->order.orderColId = INT32_MIN;
} }
/* for super table query, set default ascending order for group output */ /* for super table query, set default ascending order for group output */
...@@ -4482,6 +4499,11 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu ...@@ -4482,6 +4499,11 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
} else { } else {
pQueryInfo->order.order = pSortorder->a[0].sortOrder; pQueryInfo->order.order = pSortorder->a[0].sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
// orderby ts query on super table
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
addPrimaryTsColIntoResult(pQueryInfo);
}
} }
} }
...@@ -6277,6 +6299,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -6277,6 +6299,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
// set order by info
if (parseOrderbyClause(pCmd, pQueryInfo, pQuerySql, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// set interval value // set interval value
if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) { if (parseIntervalClause(pCmd, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
...@@ -6287,11 +6314,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -6287,11 +6314,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
} }
} }
// set order by info
if (parseOrderbyClause(pCmd, pQueryInfo, pQuerySql, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// user does not specified the query time window, twa is not allowed in such case. // user does not specified the query time window, twa is not allowed in such case.
if ((pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX || if ((pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX ||
(pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) { (pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) {
......
...@@ -166,7 +166,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -166,7 +166,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
} }
// todo handle failed to create sub query // todo handle failed to create sub query
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) { SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter)); SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
if (pSupporter == NULL) { if (pSupporter == NULL) {
return NULL; return NULL;
...@@ -1300,11 +1300,11 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1300,11 +1300,11 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// todo add test // todo add test
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); // SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
if (pState == NULL) { // if (pState == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; // code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; // goto _error;
} // }
pSql->subState.numOfSub = pQueryInfo->numOfTables; pSql->subState.numOfSub = pQueryInfo->numOfTables;
...@@ -1312,7 +1312,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1312,7 +1312,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
...@@ -1357,8 +1357,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1357,8 +1357,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
} }
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0 && pState != NULL); assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
for(int32_t i = 0; i < numOfSubs; ++i) { for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
...@@ -1371,8 +1371,6 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState ...@@ -1371,8 +1371,6 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState
taos_free_result(pSub); taos_free_result(pSub);
} }
free(pState);
} }
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
...@@ -1395,9 +1393,10 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1395,9 +1393,10 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
pSql->subState.numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
assert(pSql->subState.numOfSub > 0); pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
if (ret != 0) { if (ret != 0) {
...@@ -1407,26 +1406,24 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1407,26 +1406,24 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret; return ret;
} }
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES);
tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->subState.numOfSub); tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
if (pSql->pSubs == NULL || pState == NULL) { if (pSql->pSubs == NULL) {
taosTFree(pState);
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return ret; return ret;
} }
pSql->subState.numOfRemain = pSql->subState.numOfSub; pState->numOfRemain = pState->numOfSub;
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0; int32_t i = 0;
for (; i < pSql->subState.numOfSub; ++i) { for (; i < pState->numOfSub; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) { if (trs == NULL) {
tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
...@@ -1465,22 +1462,22 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1465,22 +1462,22 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
} }
if (i < pSql->subState.numOfSub) { if (i < pState->numOfSub) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql); tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
doCleanupSubqueries(pSql, i, pState); doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource return pRes->code; // free all allocated resource
} }
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
doCleanupSubqueries(pSql, i, pState); doCleanupSubqueries(pSql, i);
return pRes->code; return pRes->code;
} }
for(int32_t j = 0; j < pSql->subState.numOfSub; ++j) { for(int32_t j = 0; j < pState->numOfSub; ++j) {
SSqlObj* pSub = pSql->pSubs[j]; SSqlObj* pSub = pSql->pSubs[j];
SRetrieveSupport* pSupport = pSub->param; SRetrieveSupport* pSupport = pSub->param;
......
...@@ -4,6 +4,7 @@ PROJECT(TDengine) ...@@ -4,6 +4,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
......
...@@ -24,8 +24,10 @@ ...@@ -24,8 +24,10 @@
#include "twal.h" #include "twal.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tglobal.h" #include "tglobal.h"
#include "tsync.h"
#include "vnode.h" #include "vnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "syncInt.h"
#include "dnodeVWrite.h" #include "dnodeVWrite.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
...@@ -239,6 +241,10 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -239,6 +241,10 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead->len = pWrite->contLen; pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]); taosMsg[pWrite->rpcMsg.msgType]);
} else if (type == TAOS_QTYPE_CQ) {
pHead = (SWalHead *)((char*)item + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
......
...@@ -89,7 +89,7 @@ typedef struct SColumnModel { ...@@ -89,7 +89,7 @@ typedef struct SColumnModel {
typedef struct SColumnOrderInfo { typedef struct SColumnOrderInfo {
int32_t numOfCols; int32_t numOfCols;
int16_t pData[]; int16_t colIndex[];
} SColumnOrderInfo; } SColumnOrderInfo;
typedef struct tOrderDescriptor { typedef struct tOrderDescriptor {
......
...@@ -1537,7 +1537,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY ...@@ -1537,7 +1537,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
if (isNull((const char*) &pQuery->fillVal[colIndex], pCtx->inputType)) { if (isNull((const char*) &pQuery->fillVal[colIndex], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
} else { // todo refactor, tVariantCreateFromBinary should handle the NULL value } else { // todo refactor, tVariantCreateFromBinary should handle the NULL value
if(pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&pCtx->param[1], (char*) &pQuery->fillVal[colIndex], pCtx->inputBytes, pCtx->inputType); tVariantCreateFromBinary(&pCtx->param[1], (char*) &pQuery->fillVal[colIndex], pCtx->inputBytes, pCtx->inputType);
} }
} }
......
...@@ -343,8 +343,10 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t ...@@ -343,8 +343,10 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
if (f1 == f2) { if (f1 == f2) {
return 0; return 0;
} }
if (colIdx == 0 && tsOrder == TSDB_ORDER_DESC) { // primary column desc order assert(colIdx == 0);
if (tsOrder == TSDB_ORDER_DESC) { // primary column desc order
return (f1 < f2) ? 1 : -1; return (f1 < f2) ? 1 : -1;
} else { // asc } else { // asc
return (f1 < f2) ? -1 : 1; return (f1 < f2) ? -1 : 1;
...@@ -435,7 +437,7 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, ...@@ -435,7 +437,7 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols; int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) { for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderInfo.pData[i]; int32_t colIdx = pDescriptor->orderInfo.colIndex[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
...@@ -467,7 +469,7 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, ...@@ -467,7 +469,7 @@ int32_t compare_d(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1,
int32_t cmpCnt = pDescriptor->orderInfo.numOfCols; int32_t cmpCnt = pDescriptor->orderInfo.numOfCols;
for (int32_t i = 0; i < cmpCnt; ++i) { for (int32_t i = 0; i < cmpCnt; ++i) {
int32_t colIdx = pDescriptor->orderInfo.pData[i]; int32_t colIdx = pDescriptor->orderInfo.colIndex[i];
char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx); char *f1 = COLMODEL_GET_VAL(data1, pDescriptor->pColumnModel, numOfRows1, s1, colIdx);
char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx); char *f2 = COLMODEL_GET_VAL(data2, pDescriptor->pColumnModel, numOfRows2, s2, colIdx);
...@@ -557,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta ...@@ -557,13 +559,13 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
int32_t midIdx = ((end - start) >> 1) + start; int32_t midIdx = ((end - start) >> 1) + start;
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
int32_t f = pDescriptor->orderInfo.pData[0]; int32_t f = pDescriptor->orderInfo.colIndex[0];
char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f); char *midx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, midIdx, f);
char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f); char *startx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, start, f);
char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f); char *endx = COLMODEL_GET_VAL(data, pDescriptor->pColumnModel, numOfRows, end, f);
int32_t colIdx = pDescriptor->orderInfo.pData[0]; int32_t colIdx = pDescriptor->orderInfo.colIndex[0];
tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx); tSortDataPrint(pDescriptor->pColumnModel->pFields[colIdx].field.type, "before", startx, midx, endx);
#endif #endif
...@@ -591,7 +593,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta ...@@ -591,7 +593,7 @@ static void median(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
} }
static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) { static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t numOfRows, char *d, int32_t len) {
int32_t colIdx = pDescriptor->orderInfo.pData[0]; int32_t colIdx = pDescriptor->orderInfo.colIndex[0];
for (int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx); char *startx = COLMODEL_GET_VAL(d, pDescriptor->pColumnModel, numOfRows, i, colIdx);
...@@ -1075,7 +1077,7 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder ...@@ -1075,7 +1077,7 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
desc->orderInfo.numOfCols = numOfOrderCols; desc->orderInfo.numOfCols = numOfOrderCols;
for (int32_t i = 0; i < numOfOrderCols; ++i) { for (int32_t i = 0; i < numOfOrderCols; ++i) {
desc->orderInfo.pData[i] = orderColIdx[i]; desc->orderInfo.colIndex[i] = orderColIdx[i];
} }
return desc; return desc;
......
...@@ -72,7 +72,6 @@ typedef struct STableCheckInfo { ...@@ -72,7 +72,6 @@ typedef struct STableCheckInfo {
SCompInfo* pCompInfo; SCompInfo* pCompInfo;
int32_t compSize; int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not bool initBuf; // whether to initialize the in-memory skip list iterator or not
SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iter; // mem buffer skip list iterator
...@@ -117,10 +116,12 @@ typedef struct STsdbQueryHandle { ...@@ -117,10 +116,12 @@ typedef struct STsdbQueryHandle {
SFileGroupIter fileIter; SFileGroupIter fileIter;
SRWHelper rhelper; SRWHelper rhelper;
STableBlockInfo* pDataBlockInfo; STableBlockInfo* pDataBlockInfo;
SDataCols *pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size int32_t allocSize; // allocated data block size
SMemTable* mem; // mem-table SMemTable *mem; // mem-table
SMemTable* imem; // imem-table, acquired from snapshot SMemTable *imem; // imem-table, acquired from snapshot
SArray* defaultLoadColumn;// default load column SArray *defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
...@@ -181,7 +182,72 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS ...@@ -181,7 +182,72 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
return pLocalIdList; return pLocalIdList;
} }
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
assert(sizeOfGroup >= 1 && pMeta != NULL);
// allocate buffer in order to load data blocks from file
SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
if (pTableCheckInfo == NULL) {
return NULL;
}
// todo apply the lastkey of table check to avoid to load header file
for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group);
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
info.tableId = ((STable*)(pKeyInfo->pTable))->tableId;
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
info.tableId.tid = info.pTableObj->tableId.tid;
info.tableId.uid = info.pTableObj->tableId.uid;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(info.lastKey >= pQueryHandle->window.skey);
} else {
assert(info.lastKey <= pQueryHandle->window.skey);
}
taosArrayPush(pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid,
info.tableId.tid, info.lastKey, pQueryHandle->qinfo);
}
}
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
return pTableCheckInfo;
}
static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) {
size_t si = taosArrayGetSize(pTableCheckInfo);
SArray* pNew = taosArrayInit(si, sizeof(STableCheckInfo));
if (pNew == NULL) {
return NULL;
}
for (int32_t j = 0; j < si; ++j) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, j);
STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
info.tableId = pCheckInfo->tableId;
taosArrayPush(pNew, &info);
}
// it is ordered already, no need to sort again.
taosArraySort(pNew, tsdbCheckInfoCompar);
return pNew;
}
static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo) {
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
goto out_of_memory; goto out_of_memory;
...@@ -205,9 +271,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -205,9 +271,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
} }
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem); tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
assert(pCond != NULL && pCond->numOfCols > 0);
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
if (ASCENDING_TRAVERSE(pCond->order)) { if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey); assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
...@@ -216,21 +280,19 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -216,21 +280,19 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
} }
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
int32_t numOfCols = pCond->numOfCols; pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
if (pQueryHandle->statis == NULL) { if (pQueryHandle->statis == NULL) {
goto out_of_memory; goto out_of_memory;
} }
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
if (pQueryHandle->pColumns == NULL) { if (pQueryHandle->pColumns == NULL) {
goto out_of_memory; goto out_of_memory;
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i]; colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
if (colInfo.pData == NULL) { if (colInfo.pData == NULL) {
...@@ -240,61 +302,47 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -240,61 +302,47 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->statis[i].colId = colInfo.info.colId; pQueryHandle->statis[i].colId = colInfo.info.colId;
} }
pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
if (pQueryHandle->pTableCheckInfo == NULL) {
goto out_of_memory;
}
STsdbMeta* pMeta = tsdbGetMeta(tsdb); STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); assert(pMeta != NULL);
// todo apply the lastkey of table check to avoid to load header file
for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group);
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
info.tableId = ((STable*)(pKeyInfo->pTable))->tableId;
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock);
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); if (pQueryHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo);
info.tableId.tid = info.pTableObj->tableId.tid; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
info.tableId.uid = info.pTableObj->tableId.uid; goto out_of_memory;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(info.lastKey >= pQueryHandle->window.skey);
} else {
assert(info.lastKey <= pQueryHandle->window.skey);
}
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid,
info.tableId.tid, info.lastKey, qinfo);
}
} }
taosArraySort(pQueryHandle->pTableCheckInfo, tsdbCheckInfoCompar);
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
return (TsdbQueryHandleT) pQueryHandle; return (TsdbQueryHandleT) pQueryHandle;
out_of_memory: out_of_memory:
tsdbCleanupQueryHandle(pQueryHandle); tsdbCleanupQueryHandle(pQueryHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo);
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
// todo apply the lastkey of table check to avoid to load header file
pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta);
if (pQueryHandle->pTableCheckInfo == NULL) {
tsdbCleanupQueryHandle(pQueryHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
return (TsdbQueryHandleT) pQueryHandle;
}
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) { TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
pCond->twindow = changeTableGroupByLastrow(groupList); pCond->twindow = changeTableGroupByLastrow(groupList);
...@@ -689,22 +737,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -689,22 +737,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
} }
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
STsdbRepo *pRepo = pQueryHandle->pTsdb; int64_t st = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
if (pCheckInfo->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
}
STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
int32_t code = tdInitDataCols(pCheckInfo->pDataCols, pSchema); int32_t code = tdInitDataCols(pQueryHandle->pDataCols, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -1924,77 +1960,33 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1924,77 +1960,33 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
return true; return true;
} else { } else {
STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
if (pSecQueryHandle == NULL) { STsdbQueryCond cond = {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; .order = TSDB_ORDER_ASC,
.numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)),
.twindow = win};
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return false; return false;
} }
pSecQueryHandle->order = TSDB_ORDER_ASC; for(int32_t i = 0; i < cond.numOfCols; ++i) {
pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i);
pSecQueryHandle->pTsdb = pQueryHandle->pTsdb; memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
pSecQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pSecQueryHandle->cur.fid = -1;
pSecQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pSecQueryHandle->checkFiles = true;
pSecQueryHandle->activeIndex = 0;
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pSecQueryHandle);
return false;
} }
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem); STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo);
// allocate buffer in order to load data blocks from file taosTFree(cond.colList);
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey);
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); if (pSecQueryHandle->pTableCheckInfo == NULL) {
if (pSecQueryHandle->statis == NULL || pSecQueryHandle->pColumns == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbCleanupQueryHandle(pSecQueryHandle); tsdbCleanupQueryHandle(pSecQueryHandle);
return false; return false;
} }
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0};
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
colInfo.info = pCol->info;
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
if (colInfo.pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
}
size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
assert(pMeta != NULL);
for (int32_t j = 0; j < si; ++j) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
STableCheckInfo info = {
.lastKey = pSecQueryHandle->window.skey,
.pTableObj = pCheckInfo->pTableObj,
};
info.tableId = pCheckInfo->tableId;
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
}
tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
tsdbCleanupQueryHandle(pSecQueryHandle); tsdbCleanupQueryHandle(pSecQueryHandle);
return false; return false;
...@@ -2003,6 +1995,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -2003,6 +1995,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle));
size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes); memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
...@@ -2016,11 +2011,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -2016,11 +2011,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
// it is ascending order // it is ascending order
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; pQueryHandle->order = TSDB_ORDER_DESC;
pQueryHandle->window = pQueryHandle->cur.win; pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
pQueryHandle->cur.rows = 2; pQueryHandle->cur.rows = 2;
pQueryHandle->cur.mixBlock = true; pQueryHandle->cur.mixBlock = true;
pQueryHandle->order = TSDB_ORDER_DESC;
int32_t step = -1;// one step for ascending order traverse int32_t step = -1;// one step for ascending order traverse
for (int32_t j = 0; j < si; ++j) { for (int32_t j = 0; j < si; ++j) {
...@@ -2686,8 +2681,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2686,8 +2681,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
destroyTableMemIterator(pTableCheckInfo); destroyTableMemIterator(pTableCheckInfo);
tdFreeDataCols(pTableCheckInfo->pDataCols);
pTableCheckInfo->pDataCols = NULL;
taosTFree(pTableCheckInfo->pCompInfo); taosTFree(pTableCheckInfo->pCompInfo);
} }
taosArrayDestroy(pQueryHandle->pTableCheckInfo); taosArrayDestroy(pQueryHandle->pTableCheckInfo);
...@@ -2711,6 +2704,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2711,6 +2704,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tsdbDestroyHelper(&pQueryHandle->rhelper); tsdbDestroyHelper(&pQueryHandle->rhelper);
tdFreeDataCols(pQueryHandle->pDataCols);
pQueryHandle->pDataCols = NULL;
SIOCostSummary* pCost = &pQueryHandle->cost; SIOCostSummary* pCost = &pQueryHandle->cost;
tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %p", tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %p",
pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo); pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo);
......
...@@ -4,6 +4,7 @@ PROJECT(TDengine) ...@@ -4,6 +4,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
......
...@@ -62,6 +62,7 @@ typedef struct { ...@@ -62,6 +62,7 @@ typedef struct {
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, void *pHead, int type); int vnodeWriteToQueue(void *param, void *pHead, int type);
int vnodeWriteCqMsgToQueue(void *param, void *pHead, int type);
void vnodeInitWriteFp(void); void vnodeInitWriteFp(void);
void vnodeInitReadFp(void); void vnodeInitReadFp(void);
......
...@@ -259,7 +259,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -259,7 +259,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue; cqCfg.cqWrite = vnodeWriteCqMsgToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) { if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
......
...@@ -22,9 +22,11 @@ ...@@ -22,9 +22,11 @@
#include "tutil.h" #include "tutil.h"
#include "tsdb.h" #include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "tsync.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "syncInt.h"
#include "tcq.h" #include "tcq.h"
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
...@@ -189,6 +191,25 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -189,6 +191,25 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param;
SWalHead * pHead = data;
int size = sizeof(SWalHead) + pHead->len;
SSyncHead *pSync = (SSyncHead*) taosAllocateQitem(size + sizeof(SSyncHead));
SWalHead *pWal = (SWalHead *)(pSync + 1);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
taosWriteQitem(pVnode->wqueue, type, pSync);
return 0;
}
int vnodeWriteToQueue(void *param, void *data, int type) { int vnodeWriteToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = param;
SWalHead * pHead = data; SWalHead * pHead = data;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册