未验证 提交 e8a1c539 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #20171 from taosdata/fix/join_bug

fix(query): fix join_bug.
......@@ -1253,8 +1253,15 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->tsBuf != NULL) {
// note: here used the idx instead of actual vnode id.
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks);
int32_t vgId = 0;
if (pTableMetaInfo->vgroupList != NULL) {
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
vgId = pTableMetaInfo->vgroupList->vgroups[vnodeIndex].vgId;
} else {
vgId = query.vgId;
}
code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vgId, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
......@@ -1317,8 +1324,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen;
pQueryMsg->extend = 1;
STLV *tlv = (STLV *)pMsg;
......
......@@ -110,7 +110,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf);
STSBuf* tsBufClone(STSBuf* pTSBuf);
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
SArray* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
......
......@@ -651,13 +651,16 @@ static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockIn
return 0;
}
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id);
if (j == -1) {
return NULL;
SArray* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) {
SArray* pList = taosArrayInit(4, sizeof(STSGroupBlockInfo));
for(int32_t i = 0; i < pTSBuf->numOfGroups; ++i) {
if (pTSBuf->pData[i].info.id == id) {
taosArrayPush(pList, &pTSBuf->pData[i].info);
}
}
return &pTSBuf->pData[j].info;
return pList;
}
int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
......@@ -1099,28 +1102,33 @@ void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) {
}
}
int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) {
assert(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups);
STSGroupBlockInfo *pBlockInfo = &pTSBuf->pData[groupIndex].info;
int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupId, void* buf, int32_t* len, int32_t* numOfBlocks) {
SArray* pList = tsBufGetGroupBlockInfo(pTSBuf, groupId);
*len = 0;
*numOfBlocks = 0;
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f));
// qError("%p: fseek failed: %s", pSql, tstrerror(code));
return code;
}
char* p = buf;
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STSGroupBlockInfo* pBlockInfo = taosArrayGet(pList, i);
size_t s = fread(buf, 1, pBlockInfo->compLen, pTSBuf->f);
if (s != pBlockInfo->compLen) {
int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f));
// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
return code;
}
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f));
// qError("%p: fseek failed: %s", pSql, tstrerror(code));
return code;
}
*len = pBlockInfo->compLen;
*numOfBlocks = pBlockInfo->numOfBlocks;
size_t s = fread(p, 1, pBlockInfo->compLen, pTSBuf->f);
if (s != pBlockInfo->compLen) {
int code = TAOS_SYSTEM_ERROR(ferror(pTSBuf->f));
// tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code));
return code;
}
*len += pBlockInfo->compLen;
*numOfBlocks += pBlockInfo->numOfBlocks;
p += pBlockInfo->compLen;
}
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册