提交 64ca1f72 编写于 作者: W wpan

fix join issues

上级 22796177
...@@ -730,8 +730,16 @@ static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporte ...@@ -730,8 +730,16 @@ static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporte
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey); assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
pQueryInfo->window = *win; pQueryInfo->window = *win;
}
int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
__compar_fn_t func = getComparFunc(t1->padding, 0);
return func(t1->tag, t2->tag);
} }
int32_t tidTagsCompar(const void* p1, const void* p2) { int32_t tidTagsCompar(const void* p1, const void* p2) {
...@@ -742,28 +750,7 @@ int32_t tidTagsCompar(const void* p1, const void* p2) { ...@@ -742,28 +750,7 @@ int32_t tidTagsCompar(const void* p1, const void* p2) {
return (t1->vgId > t2->vgId) ? 1 : -1; return (t1->vgId > t2->vgId) ? 1 : -1;
} }
tstr* tag1 = (tstr*) t1->tag; return tagValCompar(p1, p2);
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return strncmp(tag1->data, tag2->data, tag1->len);
}
int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
tstr* tag1 = (tstr*) t1->tag;
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return memcmp(tag1->data, tag2->data, tag1->len);
} }
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) { void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
...@@ -889,6 +876,12 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq ...@@ -889,6 +876,12 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
return true; return true;
} }
static void setTidTagType(SJoinSupporter* p, uint8_t type) {
for (int32_t i = 0; i < p->num; ++i) {
STidTags * tag = (STidTags*) varDataVal(p->pIdTagList + i * p->tagSize);
tag->padding = type;
}
}
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) { static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
int16_t joinNum = pParentSql->subState.numOfSub; int16_t joinNum = pParentSql->subState.numOfSub;
...@@ -908,6 +901,8 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -908,6 +901,8 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
for (int32_t i = 0; i < joinNum; i++) { for (int32_t i = 0; i < joinNum; i++) {
SJoinSupporter* p = pParentSql->pSubs[i]->param; SJoinSupporter* p = pParentSql->pSubs[i]->param;
setTidTagType(p, pColSchema->type);
ctxlist[i].p = p; ctxlist[i].p = p;
ctxlist[i].res = taosArrayInit(p->num, size); ctxlist[i].res = taosArrayInit(p->num, size);
......
...@@ -267,6 +267,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -267,6 +267,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f); metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f);
} else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
float tfloat = pBlock->tag.dKey;
metaLen += (int32_t)fwrite(&tfloat, 1, (size_t) pBlock->tag.nLen, pTSBuf->f);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
metaLen += (int32_t)fwrite(&pBlock->tag.i64, 1, (size_t) pBlock->tag.nLen, pTSBuf->f); metaLen += (int32_t)fwrite(&pBlock->tag.i64, 1, (size_t) pBlock->tag.nLen, pTSBuf->f);
...@@ -351,6 +355,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -351,6 +355,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
UNUSED(sz); UNUSED(sz);
} else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
float tfloat = 0;
sz = fread(&tfloat, (size_t) pBlock->tag.nLen, 1, pTSBuf->f);
pBlock->tag.dKey = tfloat;
UNUSED(sz);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { //TODO check the return value } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { //TODO check the return value
sz = fread(&pBlock->tag.i64, (size_t) pBlock->tag.nLen, 1, pTSBuf->f); sz = fread(&pBlock->tag.i64, (size_t) pBlock->tag.nLen, 1, pTSBuf->f);
UNUSED(sz); UNUSED(sz);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册