提交 c1bea666 编写于 作者: H Haojun Liao

[td-1373]

上级 c6cc433c
...@@ -979,27 +979,12 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -979,27 +979,12 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
tscBuildResFromSubqueries(pParentSql); tscBuildResFromSubqueries(pParentSql);
} }
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
int32_t notInvolved = 0;
SSubqueryState* pState = &pSql->subState;
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) {
notInvolved++;
// } else {
// (SJoinSupporter*)pSql->pSubs[i]->param;
}
}
pState->numOfRemain = numOfFetch;
return NULL;
}
void tscFetchDatablockFromSubquery(SSqlObj* pSql) { void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
assert(pSql->subState.numOfSub >= 1); assert(pSql->subState.numOfSub >= 1);
int32_t numOfFetch = 0; int32_t numOfFetch = 0;
bool hasData = true; bool hasData = true;
bool reachLimit = false;
// if the subquery is NULL, it does not involved in the final result generation // if the subquery is NULL, it does not involved in the final result generation
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
...@@ -1025,7 +1010,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -1025,7 +1010,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
} }
} else { // has reach the limitation, no data anymore } else { // has reach the limitation, no data anymore
if (pRes->row >= pRes->numOfRows) { if (pRes->row >= pRes->numOfRows) {
hasData = false; reachLimit = true;
hasData = false;
break; break;
} }
} }
...@@ -1039,7 +1025,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -1039,7 +1025,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
// If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
// super table projection query. // super table projection query.
if (numOfFetch <= 0) { if (numOfFetch <= 0 && !reachLimit) {
bool tryNextVnode = false; bool tryNextVnode = false;
SSqlObj* pp = pSql->pSubs[0]; SSqlObj* pp = pSql->pSubs[0];
...@@ -1109,8 +1095,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -1109,8 +1095,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
// retrieve data from current vnode. // retrieve data from current vnode.
tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
tscUpdateSubqueryStatus(pSql, numOfFetch); pSql->subState.numOfRemain = numOfFetch;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) { if (pSql1 == NULL) {
...@@ -1522,7 +1508,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1522,7 +1508,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState; SSubqueryState *pState = &pSql->subState;
pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; pState->numOfSub = 0;
if (pTableMetaInfo->pVgroupTables == NULL) {
pState->numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
} else {
pState->numOfSub = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
}
assert(pState->numOfSub > 0); assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
......
...@@ -145,8 +145,9 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { ...@@ -145,8 +145,9 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return; if (pSrc == NULL || pDst == NULL) return;
pDst->nType = pSrc->nType; pDst->nType = pSrc->nType;
if (pSrc->nType >= TSDB_DATA_TYPE_BOOL && pSrc->nType <= TSDB_DATA_TYPE_DOUBLE) {
if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) { pDst->i64Key = pSrc->i64Key;
} else if (pSrc->nType == TSDB_DATA_TYPE_BINARY || pSrc->nType == TSDB_DATA_TYPE_NCHAR) {
int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE; int32_t len = pSrc->nLen + TSDB_NCHAR_SIZE;
char* p = realloc(pDst->pz, len); char* p = realloc(pDst->pz, len);
assert(p); assert(p);
...@@ -156,11 +157,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { ...@@ -156,11 +157,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
memcpy(pDst->pz, pSrc->pz, pSrc->nLen); memcpy(pDst->pz, pSrc->pz, pSrc->nLen);
pDst->nLen = pSrc->nLen; pDst->nLen = pSrc->nLen;
return; } else if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) { // this is only for string array
}
// this is only for string array
if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) {
size_t num = taosArrayGetSize(pSrc->arr); size_t num = taosArrayGetSize(pSrc->arr);
pDst->arr = taosArrayInit(num, sizeof(char*)); pDst->arr = taosArrayInit(num, sizeof(char*));
for(size_t i = 0; i < num; i++) { for(size_t i = 0; i < num; i++) {
...@@ -168,8 +165,6 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { ...@@ -168,8 +165,6 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
char* n = strdup(p); char* n = strdup(p);
taosArrayPush(pDst->arr, &n); taosArrayPush(pDst->arr, &n);
} }
return;
} }
pDst->nLen = tDataTypeDesc[pDst->nType].nSize; pDst->nLen = tDataTypeDesc[pDst->nType].nSize;
......
...@@ -4767,36 +4767,61 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4767,36 +4767,61 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
} }
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
tVariant* pTag = &pRuntimeEnv->pCtx[0].tag;
if (pRuntimeEnv->cur.vgroupIndex == -1) { if (pRuntimeEnv->cur.vgroupIndex == -1) {
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pRuntimeEnv->pCtx[0].tag); STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem.vnode < 0) { if (elem.vnode < 0) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pRuntimeEnv->pCtx[0].tag.pz); if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64Key);
}
return false; return false;
} else { } else {
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pRuntimeEnv->pCtx[0].tag.pz,
cur.blockIndex, cur.tsIndex); if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz,
cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key,
cur.blockIndex, cur.tsIndex);
}
} }
} else { } else {
STSElem elem = tsBufGetElem(pRuntimeEnv->pTSBuf); STSElem elem = tsBufGetElem(pRuntimeEnv->pTSBuf);
if (tVariantCompare(&elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) { if (tVariantCompare(&elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pRuntimeEnv->pCtx[0].tag); STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, pTag);
// failed to find data with the specified tag value and vnodeId // failed to find data with the specified tag value and vnodeId
if (elem1.vnode < 0) { if (elem1.vnode < 0) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pRuntimeEnv->pCtx[0].tag.pz); if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
qError("QInfo:%p failed to find tag:%s in ts_comp", pQInfo, pTag->pz);
} else {
qError("QInfo:%p failed to find tag:%"PRId64" in ts_comp", pQInfo, pTag->i64Key);
}
return false; return false;
} else { } else {
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pRuntimeEnv->pCtx[0].tag.pz, if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
cur.blockIndex, cur.tsIndex); qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, cur.blockIndex, cur.tsIndex);
}
} }
} else { } else {
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur); tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur);
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
qDebug("QInfo:%p continue scan ts_comp file, tag:%s blockIndex:%d, tsIndex:%d", pQInfo, pRuntimeEnv->pCtx[0].tag.pz, if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) {
cur.blockIndex, cur.tsIndex); qDebug("QInfo:%p continue scan ts_comp file, tag:%s blockIndex:%d, tsIndex:%d", pQInfo, pTag->pz, cur.blockIndex, cur.tsIndex);
} else {
qDebug("QInfo:%p continue scan ts_comp file, tag:%"PRId64" blockIndex:%d, tsIndex:%d", pQInfo, pTag->i64Key, cur.blockIndex, cur.tsIndex);
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册