未验证 提交 01f142d1 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7886 from taosdata/fix/query

Fix/query
...@@ -285,7 +285,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); ...@@ -285,7 +285,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo); SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo); void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
/** /**
* The create object function must be successful expect for the out of memory issue. * The create object function must be successful expect for the out of memory issue.
* *
......
...@@ -8443,10 +8443,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod ...@@ -8443,10 +8443,7 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
pVgroup->numOfEps = existVgroupInfo.numOfEps; pVgroup->numOfEps = existVgroupInfo.numOfEps;
pVgroup->vgId = existVgroupInfo.vgId; pVgroup->vgId = existVgroupInfo.vgId;
for (int32_t k = 0; k < existVgroupInfo.numOfEps; ++k) { memcpy(&pVgroup->epAddr, &existVgroupInfo.ep, sizeof(pVgroup->epAddr));
pVgroup->epAddr[k].port = existVgroupInfo.ep[k].port;
pVgroup->epAddr[k].fqdn = strndup(existVgroupInfo.ep[k].fqdn, TSDB_FQDN_LEN);
}
} }
} }
} }
......
...@@ -1001,8 +1001,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, ...@@ -1001,8 +1001,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
(*pMsg) += sizeof(SSqlExpr); (*pMsg) += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType); pSqlExpr->param[j].nType = htonl(pExpr->param[j].nType);
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen); pSqlExpr->param[j].nLen = htonl(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy((*pMsg), pExpr->param[j].pz, pExpr->param[j].nLen); memcpy((*pMsg), pExpr->param[j].pz, pExpr->param[j].nLen);
...@@ -1107,6 +1107,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1107,6 +1107,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tableCols[i].bytes = htons(pCol->bytes); pQueryMsg->tableCols[i].bytes = htons(pCol->bytes);
pQueryMsg->tableCols[i].type = htons(pCol->type); pQueryMsg->tableCols[i].type = htons(pCol->type);
pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters); pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters);
pQueryMsg->tableCols[i].flist.filterInfo = 0;
// append the filter information after the basic column information // append the filter information after the basic column information
serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg); serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg);
...@@ -1189,6 +1190,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1189,6 +1190,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += pCond->len; pMsg += pCond->len;
} }
} else {
pQueryMsg->tagCondLen = 0;
} }
if (pQueryInfo->bufLen > 0) { if (pQueryInfo->bufLen > 0) {
...@@ -1218,6 +1221,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1218,6 +1221,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
} else {
pQueryMsg->tsBuf.tsLen = 0;
pQueryMsg->tsBuf.tsNumOfBlocks = 0;
} }
int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator); int32_t numOfOperator = (int32_t) taosArrayGetSize(queryOperator);
...@@ -1255,6 +1261,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1255,6 +1261,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += pUdfInfo->contLen; pMsg += pUdfInfo->contLen;
} }
} else {
pQueryMsg->udfContentOffset = 0;
} }
memcpy(pMsg, pSql->sqlstr, sqlLen); memcpy(pMsg, pSql->sqlstr, sqlLen);
...@@ -2297,7 +2305,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t ...@@ -2297,7 +2305,7 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
pVgroup->vgId = vmsg->vgId; pVgroup->vgId = vmsg->vgId;
for (int32_t k = 0; k < vmsg->numOfEps; ++k) { for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
pVgroup->epAddr[k].port = vmsg->epAddr[k].port; pVgroup->epAddr[k].port = vmsg->epAddr[k].port;
pVgroup->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN); tstrncpy(pVgroup->epAddr[k].fqdn, vmsg->epAddr[k].fqdn, TSDB_FQDN_LEN);
} }
doUpdateVgroupInfo(pVgroup->vgId, vmsg); doUpdateVgroupInfo(pVgroup->vgId, vmsg);
......
...@@ -748,7 +748,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr ...@@ -748,7 +748,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
SVgroupTableInfo info = {{0}}; SVgroupTableInfo info = {{0}};
for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (tt->vgId == pvg->vgroups[m].vgId) { if (tt->vgId == pvg->vgroups[m].vgId) {
tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); memcpy(&info.vgInfo, &pvg->vgroups[m], sizeof(info.vgInfo));
break; break;
} }
} }
......
...@@ -3282,11 +3282,6 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) { ...@@ -3282,11 +3282,6 @@ void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
size_t num = taosArrayGetSize(pVgroupTables); size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) { for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
tfree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList); taosArrayDestroy(pInfo->itemList);
} }
...@@ -3300,10 +3295,6 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { ...@@ -3300,10 +3295,6 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
assert(size > index); assert(size > index);
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index); SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTable, index);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
tfree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList); taosArrayDestroy(pInfo->itemList);
taosArrayRemove(pVgroupTable, index); taosArrayRemove(pVgroupTable, index);
} }
...@@ -3312,10 +3303,6 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) { ...@@ -3312,10 +3303,6 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset(info, 0, sizeof(SVgroupTableInfo)); memset(info, 0, sizeof(SVgroupTableInfo));
info->vgInfo = pInfo->vgInfo; info->vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
if (pInfo->itemList) { if (pInfo->itemList) {
info->itemList = taosArrayDup(pInfo->itemList); info->itemList = taosArrayDup(pInfo->itemList);
} }
...@@ -4354,7 +4341,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { ...@@ -4354,7 +4341,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
pNewVInfo->numOfEps = pvInfo->numOfEps; pNewVInfo->numOfEps = pvInfo->numOfEps;
for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { for(int32_t j = 0; j < pvInfo->numOfEps; ++j) {
pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); tstrncpy(pNewVInfo->epAddr[j].fqdn, pvInfo->epAddr[j].fqdn, TSDB_FQDN_LEN);
pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port;
} }
} }
...@@ -4367,34 +4354,10 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { ...@@ -4367,34 +4354,10 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
return NULL; return NULL;
} }
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
tfree(pVgroupInfo->epAddr[j].fqdn);
}
for(int32_t j = pVgroupInfo->numOfEps; j < TSDB_MAX_REPLICA; j++) {
assert( pVgroupInfo->epAddr[j].fqdn == NULL );
}
}
tfree(vgroupList); tfree(vgroupList);
return NULL; return NULL;
} }
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
for(int32_t i = 0; i < dst->numOfEps; ++i) {
tfree(dst->epAddr[i].fqdn);
dst->epAddr[i].port = src->epAddr[i].port;
assert(dst->epAddr[i].fqdn == NULL);
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
}
}
char* serializeTagData(STagData* pTagData, char* pMsg) { char* serializeTagData(STagData* pTagData, char* pMsg) {
int32_t n = (int32_t) strlen(pTagData->name); int32_t n = (int32_t) strlen(pTagData->name);
*(int32_t*) pMsg = htonl(n); *(int32_t*) pMsg = htonl(n);
...@@ -4539,7 +4502,7 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) { ...@@ -4539,7 +4502,7 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo) {
SVgroupsInfo* pInfo = calloc(1, size); SVgroupsInfo* pInfo = calloc(1, size);
pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups; pInfo->numOfVgroups = pVgroupsInfo->numOfVgroups;
for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) { for (int32_t m = 0; m < pVgroupsInfo->numOfVgroups; ++m) {
tscSVgroupInfoCopy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m]); memcpy(&pInfo->vgroups[m], &pVgroupsInfo->vgroups[m], sizeof(SVgroupMsg));
} }
return pInfo; return pInfo;
} }
......
...@@ -206,11 +206,6 @@ typedef struct { ...@@ -206,11 +206,6 @@ typedef struct {
uint16_t port; uint16_t port;
} SEpAddrMsg; } SEpAddrMsg;
typedef struct {
char* fqdn;
uint16_t port;
} SEpAddr1;
typedef struct { typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
} SMsgDesc; } SMsgDesc;
...@@ -763,17 +758,11 @@ typedef struct SSTableVgroupMsg { ...@@ -763,17 +758,11 @@ typedef struct SSTableVgroupMsg {
int32_t numOfTables; int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg; } SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct {
int32_t vgId;
int8_t numOfEps;
SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t numOfEps; int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupMsg; } SVgroupMsg, SVgroupInfo;
typedef struct { typedef struct {
int32_t numOfVgroups; int32_t numOfVgroups;
......
...@@ -84,11 +84,18 @@ typedef struct SResultRow { ...@@ -84,11 +84,18 @@ typedef struct SResultRow {
char *key; // start key of current result row char *key; // start key of current result row
} SResultRow; } SResultRow;
typedef struct SResultRowCell {
uint64_t groupId;
SResultRow *pRow;
} SResultRowCell;
typedef struct SGroupResInfo { typedef struct SGroupResInfo {
int32_t totalGroup; int32_t totalGroup;
int32_t currentGroup; int32_t currentGroup;
int32_t index; int32_t index;
SArray* pRows; // SArray<SResultRow*> SArray* pRows; // SArray<SResultRow*>
bool ordered;
int32_t position;
} SGroupResInfo; } SGroupResInfo;
/** /**
...@@ -280,6 +287,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -280,6 +287,7 @@ typedef struct SQueryRuntimeEnv {
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool SResultRowPool* pool; // window result object pool
char** prevRow; char** prevRow;
......
...@@ -546,6 +546,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult ...@@ -546,6 +546,8 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult
// add a new result set for a new group // add a new result set for a new group
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES); taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES);
SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult};
taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell);
} else { } else {
pResult = *p1; pResult = *p1;
} }
...@@ -2136,7 +2138,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2136,7 +2138,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pQueryAttr = pQueryAttr; pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell));
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES); pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES);
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
...@@ -2412,6 +2415,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2412,6 +2415,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
destroyOperatorInfo(pRuntimeEnv->proot); destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool); pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroy(pRuntimeEnv->pResultRowArrayList);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult); taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL; pRuntimeEnv->prevResult = NULL;
} }
...@@ -7586,8 +7590,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -7586,8 +7590,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pMsg += sizeof(SSqlExpr); pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType);
pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen);
if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) {
pExprMsg->param[j].pz = pMsg; pExprMsg->param[j].pz = pMsg;
...@@ -7634,8 +7638,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -7634,8 +7638,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pMsg += sizeof(SSqlExpr); pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType);
pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen);
if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) {
pExprMsg->param[j].pz = pMsg; pExprMsg->param[j].pz = pMsg;
......
...@@ -416,158 +416,83 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow ...@@ -416,158 +416,83 @@ static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow
return 0; return 0;
} }
static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { int32_t tsAscOrder(const void* p1, const void* p2) {
int32_t left = *(int32_t *)pLeft; SResultRowCell* pc1 = (SResultRowCell*) p1;
int32_t right = *(int32_t *)pRight; SResultRowCell* pc2 = (SResultRowCell*) p2;
SCompSupporter * supporter = (SCompSupporter *)param; if (pc1->groupId == pc2->groupId) {
if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
int32_t leftPos = supporter->rowIndex[left]; return 0;
int32_t rightPos = supporter->rowIndex[right]; } else {
return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1;
/* left source is exhausted */
if (leftPos == -1) {
return 1;
} }
} else {
/* right source is exhausted*/ return (pc1->groupId < pc2->groupId)? -1:1;
if (rightPos == -1) {
return -1;
} }
}
STableQueryInfo** pList = supporter->pTableQueryInfo; int32_t tsDescOrder(const void* p1, const void* p2) {
SResultRowCell* pc1 = (SResultRowCell*) p1;
SResultRowInfo *pWindowResInfo1 = &(pList[left]->resInfo); SResultRowCell* pc2 = (SResultRowCell*) p2;
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
TSKEY leftTimestamp = pWindowRes1->win.skey;
SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo);
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
TSKEY rightTimestamp = pWindowRes2->win.skey;
if (leftTimestamp == rightTimestamp) { if (pc1->groupId == pc2->groupId) {
if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
return 0; return 0;
} else {
return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1;
} }
if (supporter->order == TSDB_ORDER_ASC) {
return (leftTimestamp > rightTimestamp)? 1:-1;
} else { } else {
return (leftTimestamp < rightTimestamp)? 1:-1; return (pc1->groupId < pc2->groupId)? -1:1;
} }
} }
static int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
int32_t* rowCellInfoOffset) { __compar_fn_t fn = NULL;
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder;
int32_t code = TSDB_CODE_SUCCESS; } else {
fn = tsDescOrder;
int32_t *posList = NULL;
SLoserTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL;
size_t size = taosArrayGetSize(pTableList);
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
posList = calloc(size, sizeof(int32_t));
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv));
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
} }
int32_t numOfTables = 0; taosArraySort(pRuntimeEnv->pResultRowArrayList, fn);
for (int32_t i = 0; i < size; ++i) { }
STableQueryInfo *item = taosArrayGetP(pTableList, i);
if (item->resInfo.size > 0) {
pTableQueryInfoList[numOfTables++] = item;
}
}
// there is no data in current group static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
// no need to merge results since only one table in each group if (!pGroupResInfo->ordered) {
if (numOfTables == 0) { orderTheResultRows(pRuntimeEnv);
goto _end; pGroupResInfo->ordered = true;
} }
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order}; if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
} }
int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX; size_t len = taosArrayGetSize(pRuntimeEnv->pResultRowArrayList);
int64_t startt = taosGetTimestampMs(); for(; pGroupResInfo->position < len; ++pGroupResInfo->position) {
while (1) {
int32_t tableIndex = pTree->pNode[0].index;
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
if (num <= 0) {
cs.rowIndex[tableIndex] += 1;
if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) { SResultRowCell* pResultRowCell = taosArrayGet(pRuntimeEnv->pResultRowArrayList, pGroupResInfo->position);
cs.rowIndex[tableIndex] = -1; if (pResultRowCell->groupId != groupId) {
if (--numOfTables == 0) { // all input sources are exhausted
break; break;
} }
}
} else {
assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery));
if (pWindowRes->win.skey != lastTimestamp) { int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pResultRowCell->pRow, rowCellInfoOffset);
taosArrayPush(pGroupResInfo->pRows, &pWindowRes); if (num <= 0) {
pWindowRes->numOfRows = (uint32_t) num; continue;
} }
lastTimestamp = pWindowRes->win.skey; taosArrayPush(pGroupResInfo->pRows, &pResultRowCell->pRow);
pResultRowCell->pRow->numOfRows = (uint32_t) num;
// move to the next row of current entry
if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
// all input sources are exhausted
if ((--numOfTables) == 0) {
break;
}
}
} }
tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries); return TSDB_CODE_SUCCESS;
}
int64_t endt = taosGetTimestampMs();
qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv),
pGroupResInfo->currentGroup, endt - startt);
_end:
tfree(pTableQueryInfoList);
tfree(posList);
tfree(pTree);
return code;
} }
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) { int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, pGroupResInfo->currentGroup); mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset);
int32_t ret = mergeIntoGroupResultImpl(pRuntimeEnv, pGroupResInfo, group, offset);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
// this group generates at least one result, return results // this group generates at least one result, return results
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) { if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
...@@ -583,7 +508,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu ...@@ -583,7 +508,6 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu
qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv), qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv),
pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
// pQInfo->summary.firstStageMergeTime += elapsedTime;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -100,7 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k ...@@ -100,7 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k
} }
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) { static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) {
STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
......
...@@ -288,8 +288,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa ...@@ -288,8 +288,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
info.tableId = ((STable*)(pKeyInfo->pTable))->tableId;
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
...@@ -2230,7 +2228,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -2230,7 +2228,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
SBlock* pBlock = pTableCheck->pCompInfo->blocks; SBlock* pBlock = pTableCheck->pCompInfo->blocks;
sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); char* buf = malloc(sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
if (buf == NULL) { if (buf == NULL) {
cleanBlockOrderSupporter(&sup, numOfQualTables); cleanBlockOrderSupporter(&sup, numOfQualTables);
return TSDB_CODE_TDB_OUT_OF_MEMORY; return TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -3630,8 +3628,6 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -3630,8 +3628,6 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i); STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
assert(((STable*)pKeyInfo->pTable)->type == TSDB_CHILD_TABLE);
tsdbRefTable(pKeyInfo->pTable); tsdbRefTable(pKeyInfo->pTable);
STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey}; STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey};
......
...@@ -741,7 +741,7 @@ void taosHashTableResize(SHashObj *pHashObj) { ...@@ -741,7 +741,7 @@ void taosHashTableResize(SHashObj *pHashObj) {
} }
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
SHashNode *pNewNode = calloc(1, sizeof(SHashNode) + keyLen + dsize); SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize);
if (pNewNode == NULL) { if (pNewNode == NULL) {
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
...@@ -752,6 +752,8 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s ...@@ -752,6 +752,8 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
pNewNode->hashVal = hashVal; pNewNode->hashVal = hashVal;
pNewNode->dataLen = (uint32_t) dsize; pNewNode->dataLen = (uint32_t) dsize;
pNewNode->count = 1; pNewNode->count = 1;
pNewNode->removed = 0;
pNewNode->next = NULL;
memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);
memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen); memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen);
......
...@@ -90,12 +90,13 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { ...@@ -90,12 +90,13 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
SLoserTreeNode kLeaf = pTree->pNode[idx]; SLoserTreeNode kLeaf = pTree->pNode[idx];
while (parentId > 0) { while (parentId > 0) {
if (pTree->pNode[parentId].index == -1) { SLoserTreeNode* pCur = &pTree->pNode[parentId];
if (pCur->index == -1) {
pTree->pNode[parentId] = kLeaf; pTree->pNode[parentId] = kLeaf;
return; return;
} }
int32_t ret = pTree->comparFn(&pTree->pNode[parentId], &kLeaf, pTree->param); int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param);
if (ret < 0) { if (ret < 0) {
SLoserTreeNode t = pTree->pNode[parentId]; SLoserTreeNode t = pTree->pNode[parentId];
pTree->pNode[parentId] = kLeaf; pTree->pNode[parentId] = kLeaf;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册