提交 0f22b3d4 编写于 作者: H Haojun Liao

[td-3186]

上级 0aadc6c9
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include <tsched.h>
#include "exception.h" #include "exception.h"
#include "os.h" #include "os.h"
#include "qExtbuffer.h" #include "qExtbuffer.h"
...@@ -93,22 +94,12 @@ typedef struct SVgroupTableInfo { ...@@ -93,22 +94,12 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo> SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo; } SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd) {
assert(pCmd != NULL);
if (pCmd->pQueryInfo == NULL) {
return NULL;
}
return pCmd->active;
}
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo); void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset); uint32_t offset);
......
...@@ -454,7 +454,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); ...@@ -454,7 +454,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock); void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput); void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
void destroyTableNameList(SSqlCmd* pCmd); void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......
...@@ -222,6 +222,17 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { ...@@ -222,6 +222,17 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
// handle the sub queries of join query // handle the sub queries of join query
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pSql->self;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
return;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql); tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) { } else if (pRes->completed) {
...@@ -258,7 +269,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { ...@@ -258,7 +269,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
} }
SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd); SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd);
tscBuildAndSendRequest(pSql, pQueryInfo1); tscBuildAndSendRequest(pSql, pQueryInfo1);
} }
} }
...@@ -477,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -477,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
} else { // stream computing } else { // stream computing
SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
......
...@@ -7201,6 +7201,8 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -7201,6 +7201,8 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
} }
tfree(pTableMeta);
// load the table meta for a given table name list // load the table meta for a given table name list
if (taosArrayGetSize(plist) > 0) { if (taosArrayGetSize(plist) > 0) {
int32_t code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList); int32_t code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList);
...@@ -7334,11 +7336,13 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -7334,11 +7336,13 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
// NOTE: order mix up in subquery not support yet. // NOTE: order mix up in subquery not support yet.
pQueryInfo->order = pSub->order; pQueryInfo->order = pSub->order;
char* tmp = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES); STableMetaInfo** tmp = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
if (tmp == NULL) { if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pQueryInfo->pTableMetaInfo = tmp;
pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables] = pTableMetaInfo1; pQueryInfo->pTableMetaInfo[pQueryInfo->numOfTables] = pTableMetaInfo1;
pQueryInfo->numOfTables += 1; pQueryInfo->numOfTables += 1;
......
...@@ -539,7 +539,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) { ...@@ -539,7 +539,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pRetrieveMsg->free = htons(pQueryInfo->type); pRetrieveMsg->free = htons(pQueryInfo->type);
pRetrieveMsg->qId = htobe64(pSql->res.qId); pRetrieveMsg->qId = htobe64(pSql->res.qId);
...@@ -822,7 +822,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -822,7 +822,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this
} }
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
SQueryAttr query = {{0}}; SQueryAttr query = {{0}};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
...@@ -1620,7 +1620,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { ...@@ -1620,7 +1620,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
} }
// global aggregation may be the upstream for parent query // global aggregation may be the upstream for parent query
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pQInfo == NULL) { if (pQueryInfo->pQInfo == NULL) {
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),};
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
...@@ -2061,6 +2061,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { ...@@ -2061,6 +2061,7 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
pSql->res.numOfTotal = pMultiMeta->numOfTables; pSql->res.numOfTotal = pMultiMeta->numOfTables;
tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables); tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables);
taosHashCleanup(pSet);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2363,7 +2364,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2363,7 +2364,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes->completed = (pRetrieve->completed == 1); pRes->completed = (pRetrieve->completed == 1);
pRes->data = pRetrieve->data; pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) { if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code; return pRes->code;
} }
......
...@@ -82,8 +82,7 @@ static bool allSubqueryDone(SSqlObj *pParentSql) { ...@@ -82,8 +82,7 @@ static bool allSubqueryDone(SSqlObj *pParentSql) {
for (int i = 0; i < subState->numOfSub; i++) { for (int i = 0; i < subState->numOfSub; i++) {
SSqlObj* pSub = pParentSql->pSubs[i]; SSqlObj* pSub = pParentSql->pSubs[i];
if (0 == subState->states[i]) { if (0 == subState->states[i]) {
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index: %d NOT finished, abort query completion check", pParentSql->self, tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index: %d NOT finished yet", pParentSql->self, pSub->self, i);
pSub->self, i);
done = false; done = false;
break; break;
} else { } else {
...@@ -100,23 +99,21 @@ static bool allSubqueryDone(SSqlObj *pParentSql) { ...@@ -100,23 +99,21 @@ static bool allSubqueryDone(SSqlObj *pParentSql) {
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
SSubqueryState *subState = &pParentSql->subState; SSubqueryState *subState = &pParentSql->subState;
assert(idx < subState->numOfSub); assert(idx < subState->numOfSub);
pthread_mutex_lock(&subState->mutex); pthread_mutex_lock(&subState->mutex);
bool done = allSubqueryDone(pParentSql); // bool done = allSubqueryDone(pParentSql);
if (done) { // if (done) {
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx); // tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d all subs already done", pParentSql->self, pSql->self, idx);
pthread_mutex_unlock(&subState->mutex); // pthread_mutex_unlock(&subState->mutex);
return false; // return false;
} // }
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64",%d state set to 1", pParentSql->self, pSql->self, idx);
tscDebug("0x%"PRIx64" subquery:0x%"PRIx64", index:%d state set to 1", pParentSql->self, pSql->self, idx);
subState->states[idx] = 1; subState->states[idx] = 1;
done = allSubqueryDone(pParentSql); bool done = allSubqueryDone(pParentSql);
pthread_mutex_unlock(&subState->mutex); pthread_mutex_unlock(&subState->mutex);
return done; return done;
} }
...@@ -2415,7 +2412,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2415,7 +2412,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
const uint32_t nBufferSize = (1u << 16u); // 64KB const uint32_t nBufferSize = (1u << 16u); // 64KB
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState; SSubqueryState *pState = &pSql->subState;
...@@ -3487,8 +3484,6 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -3487,8 +3484,6 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) { SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) {
assert(pQueryInfo != NULL); assert(pQueryInfo != NULL);
// int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
goto _cleanup; goto _cleanup;
...@@ -3506,25 +3501,25 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STable ...@@ -3506,25 +3501,25 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STable
pQueryAttr->tableGroupInfo = *pTableGroupInfo; pQueryAttr->tableGroupInfo = *pTableGroupInfo;
// calculate the result row size // calculate the result row size
SExprInfo* pei = NULL; SExprInfo* pEx = NULL;
int32_t num = 0; int32_t num = 0;
if (pQueryAttr->pExpr3 != NULL) { if (pQueryAttr->pExpr3 != NULL) {
pei = pQueryAttr->pExpr3; pEx = pQueryAttr->pExpr3;
num = pQueryAttr->numOfExpr3; num = pQueryAttr->numOfExpr3;
} else if (pQueryAttr->pExpr2 != NULL) { } else if (pQueryAttr->pExpr2 != NULL) {
pei = pQueryAttr->pExpr2; pEx = pQueryAttr->pExpr2;
num = pQueryAttr->numOfExpr2; num = pQueryAttr->numOfExpr2;
} else { } else {
pei = pQueryAttr->pExpr1; pEx = pQueryAttr->pExpr1;
num = pQueryAttr->numOfOutput; num = pQueryAttr->numOfOutput;
} }
for (int16_t col = 0; col < num; ++col) { for (int16_t col = 0; col < num; ++col) {
pQueryAttr->resultRowSize += pei[col].base.resBytes; pQueryAttr->resultRowSize += pEx[col].base.resBytes;
// keep the tag length // keep the tag length
if (TSDB_COL_IS_TAG(pei[col].base.colInfo.flag)) { if (TSDB_COL_IS_TAG(pEx[col].base.colInfo.flag)) {
pQueryAttr->tagLen += pei[col].base.resBytes; pQueryAttr->tagLen += pEx[col].base.resBytes;
} }
} }
......
...@@ -648,7 +648,7 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) { ...@@ -648,7 +648,7 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) {
typedef struct SDummyInputInfo { typedef struct SDummyInputInfo {
SSDataBlock *block; SSDataBlock *block;
SSqlRes *pRes; // refactor: remove it SSqlObj *pSql; // refactor: remove it
} SDummyInputInfo; } SDummyInputInfo;
typedef struct SJoinStatus { typedef struct SJoinStatus {
...@@ -664,92 +664,198 @@ typedef struct SJoinOperatorInfo { ...@@ -664,92 +664,198 @@ typedef struct SJoinOperatorInfo {
SRspResultInfo resultInfo; // todo refactor, add this info for each operator SRspResultInfo resultInfo; // todo refactor, add this info for each operator
} SJoinOperatorInfo; } SJoinOperatorInfo;
SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
SDummyInputInfo *pInput = pOperator->info;
char* pData = pInput->pRes->data;
SSDataBlock* pBlock = pInput->block;
pBlock->info.rows = pInput->pRes->numOfRows;
if (pBlock->info.rows == 0) {
return NULL;
}
//TODO refactor
int32_t offset = 0; int32_t offset = 0;
char* pData = pRes->data;
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
if (pData != NULL) { if (pData != NULL) {
pColData->pData = pData + offset * pBlock->info.rows; pColData->pData = pData + offset * pBlock->info.rows;
} else { } else {
pColData->pData = pInput->pRes->urow[i]; pColData->pData = pRes->urow[i];
} }
offset += pColData->info.bytes; offset += pColData->info.bytes;
} }
pInput->pRes->numOfRows = 0; pRes->numOfRows = 0;
*newgroup = false;
return pBlock;
} }
SSDataBlock* doBlockJoin(void* param, bool* newgroup) { // NOTE: there is already exists data blocks before this function calls.
SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param; SOperatorInfo *pOperator = (SOperatorInfo*) param;
assert(pOperator->numOfUpstream > 1); if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSDataBlock* block0 = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); SDummyInputInfo *pInput = pOperator->info;
SSDataBlock* block1 = pOperator->upstream[1]->exec(pOperator->upstream[1], newgroup); SSqlObj* pSql = pInput->pSql;
SSqlRes* pRes = &pSql->res;
if (block1 == NULL || block0 == NULL) { SSDataBlock* pBlock = pInput->block;
pBlock->info.rows = pRes->numOfRows;
if (pRes->numOfRows != 0) {
doSetupSDataBlock(pRes, pBlock);
*newgroup = false;
return pBlock;
}
// No data block exists. So retrieve and transfer it into to SSDataBlock
TAOS_ROW pRow = NULL;
taos_fetch_block(pSql, &pRow);
if (pRes->numOfRows == 0) {
pOperator->status = OP_EXEC_DONE;
return NULL;
}
pBlock->info.rows = pRes->numOfRows;
doSetupSDataBlock(pRes, pBlock);
*newgroup = false;
return pBlock;
}
static int32_t v = 0;
SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
assert(block0 != block1); assert(pOperator->numOfUpstream > 1);
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
pJoinInfo->status[0].pBlock = block0; pJoinInfo->pRes->info.rows = 0;
pJoinInfo->status[1].pBlock = block1;
while(1) {
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
pStatus->index = 0;
if (i == 0 && pStatus->pBlock != NULL) {
v += pStatus->pBlock->info.rows;
printf("---------------%d\n", v);
}
SJoinStatus* st0 = &pJoinInfo->status[0]; if (pStatus->pBlock == NULL) {
SJoinStatus* st1 = &pJoinInfo->status[1]; pOperator->status = OP_EXEC_DONE;
while (st0->index < st0->pBlock->info.rows && st1->index < st1->pBlock->info.rows) { pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows;
SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0); return pJoinInfo->pRes;
SColumnInfoData* p1 = taosArrayGet(st1->pBlock->pDataBlock, 0); }
}
}
SJoinStatus* st0 = &pJoinInfo->status[0];
SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0);
int64_t* ts0 = (int64_t*) p0->pData; int64_t* ts0 = (int64_t*) p0->pData;
int64_t* ts1 = (int64_t*) p1->pData;
if (ts0[st0->index] == ts1[st1->index]) { // add to the final result buffer
// check if current output buffer is over the threshold to pause current loop
int32_t rows = pJoinInfo->pRes->info.rows;
for(int32_t j = 0; j < st0->pBlock->info.numOfCols; ++j) {
SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j);
SColumnInfoData* pSrc = taosArrayGet(st0->pBlock->pDataBlock, j);
int32_t bytes = pSrc->info.bytes;
memcpy(pCol1->pData + rows * bytes, pSrc->pData + st0->index * bytes, bytes);
}
for(int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) { bool prefixEqual = true;
SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + st0->pBlock->info.numOfCols);
SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j); while(1) {
prefixEqual = true;
for (int32_t i = 1; i < pJoinInfo->numOfUpstream; ++i) {
SJoinStatus* st = &pJoinInfo->status[i];
SColumnInfoData* p = taosArrayGet(st->pBlock->pDataBlock, 0);
int64_t* ts = (int64_t*)p->pData;
int32_t bytes = pSrc->info.bytes; if (ts[st->index] < ts0[st0->index]) { // less than the first
memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes); prefixEqual = false;
if ((++(st->index)) >= st->pBlock->info.rows) {
break;
}
} else if (ts[st->index] > ts0[st0->index]) { // greater than the first;
if (prefixEqual == true) {
prefixEqual = false;
for (int32_t j = 0; j < i; ++j) {
SJoinStatus* stx = &pJoinInfo->status[j];
if ((++(stx->index)) >= stx->pBlock->info.rows) {
break;
}
}
} else {
if ((++(st0->index)) >= st0->pBlock->info.rows) {
break;
}
}
}
} }
st0->index++; if (prefixEqual) {
st1->index++; int32_t offset = 0;
pJoinInfo->pRes->info.rows++; bool completed = false;
} else if (ts0[st0->index] < ts1[st1->index]) { for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
st0->index++; SJoinStatus* st1 = &pJoinInfo->status[i];
} else { int32_t rows = pJoinInfo->pRes->info.rows;
st1->index++;
for (int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) {
SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + offset);
SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j);
int32_t bytes = pSrc->info.bytes;
memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes);
}
offset += st1->pBlock->info.numOfCols;
if ((++(st1->index)) == st1->pBlock->info.rows) {
completed = true;
}
}
if ((++pJoinInfo->pRes->info.rows) >= pJoinInfo->resultInfo.capacity) {
pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows;
return pJoinInfo->pRes;
}
if (completed == true) {
break;
}
}
} }
} /*
while (st0->index < st0->pBlock->info.rows && st1->index < st1->pBlock->info.rows) {
SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0);
SColumnInfoData* p1 = taosArrayGet(st1->pBlock->pDataBlock, 0);
int64_t* ts0 = (int64_t*)p0->pData;
int64_t* ts1 = (int64_t*)p1->pData;
if (ts0[st0->index] == ts1[st1->index]) { // add to the final result buffer
// check if current output buffer is over the threshold to pause current loop
int32_t rows = pJoinInfo->pRes->info.rows;
for (int32_t j = 0; j < st0->pBlock->info.numOfCols; ++j) {
SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j);
SColumnInfoData* pSrc = taosArrayGet(st0->pBlock->pDataBlock, j);
int32_t bytes = pSrc->info.bytes;
memcpy(pCol1->pData + rows * bytes, pSrc->pData + st0->index * bytes, bytes);
}
return pJoinInfo->pRes; for (int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) {
SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + st0->pBlock->info.numOfCols);
SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j);
int32_t bytes = pSrc->info.bytes;
memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes);
}
st0->index++;
st1->index++;
if ((++pJoinInfo->pRes->info.rows) >= pJoinInfo->resultInfo.capacity) {
pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows;
return pJoinInfo->pRes;
}
} else if (ts0[st0->index] < ts1[st1->index]) {
st0->index++;
} else {
st1->index++;
}
}*/
}
} }
static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
...@@ -762,15 +868,15 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { ...@@ -762,15 +868,15 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
} }
pInfo->block = destroyOutputBuf(pInfo->block); pInfo->block = destroyOutputBuf(pInfo->block);
pInfo->pRes = NULL; pInfo->pSql = NULL;
} }
// todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later // todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later
SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t numOfCols) { SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols) {
assert(numOfCols > 0); assert(numOfCols > 0);
SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo));
pInfo->pRes = (SSqlRes*) pResult; pInfo->pSql = pSql;
pInfo->block = calloc(numOfCols, sizeof(SSDataBlock)); pInfo->block = calloc(numOfCols, sizeof(SSDataBlock));
pInfo->block->info.numOfCols = numOfCols; pInfo->block->info.numOfCols = numOfCols;
...@@ -824,7 +930,7 @@ SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstre ...@@ -824,7 +930,7 @@ SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstre
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exec = doBlockJoin; pOperator->exec = doDataBlockJoin;
pOperator->cleanup = destroyDummyInputOperator; pOperator->cleanup = destroyDummyInputOperator;
for(int32_t i = 0; i < numOfUpstream; ++i) { for(int32_t i = 0; i < numOfUpstream; ++i) {
...@@ -848,7 +954,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -848,7 +954,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->completed = (pRes->numOfRows == 0); pRes->completed = (pRes->numOfRows == 0);
} }
void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) {
// handle the following query process // handle the following query process
if (px->pQInfo == NULL) { if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList); SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
...@@ -879,26 +985,29 @@ void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* ...@@ -879,26 +985,29 @@ void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo*
// if it is a join query, create join operator here // if it is a join query, create join operator here
int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns; int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns;
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1);
SOperatorInfo* pSourceOperator = createDummyInputOperator((char*)pRes[0], pSchema, numOfCol1);
SSchema* schema = NULL; SSchema* schema = NULL;
if (px->numOfTables > 1) { if (px->numOfTables > 1) {
SOperatorInfo* p[2] = {0}; SOperatorInfo** p = calloc(px->numOfTables, POINTER_BYTES);
p[0] = pSourceOperator; p[0] = pSourceOperator;
SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[1]->pTableMeta); int32_t num = taosArrayGetSize(px->colList);
numOfCol1 = px->pTableMetaInfo[1]->pTableMeta->tableInfo.numOfColumns; schema = calloc(num, sizeof(SSchema));
memcpy(schema, pSchema, numOfCol1*sizeof(SSchema));
int32_t offset = pSourceOperator->numOfOutput;
SOperatorInfo* pSourceOperator1 = createDummyInputOperator((char*)pRes[1], pSchema1, numOfCol1); for(int32_t i = 1; i < px->numOfTables; ++i) {
p[1] = pSourceOperator1; SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[i]->pTableMeta);
int32_t n = px->pTableMetaInfo[i]->pTableMeta->tableInfo.numOfColumns;
int32_t num = pSourceOperator->numOfOutput + pSourceOperator1->numOfOutput; p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n);
schema = calloc(num, sizeof(SSchema));
memcpy(&schema[0], pSchema, pSourceOperator->numOfOutput * sizeof(SSchema)); memcpy(&schema[offset], pSchema1, n * sizeof(SSchema));
offset += n;
}
memcpy(&schema[pSourceOperator->numOfOutput], pSchema1, pSourceOperator1->numOfOutput * sizeof(SSchema));
pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num); pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num);
} }
...@@ -917,6 +1026,7 @@ void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* ...@@ -917,6 +1026,7 @@ void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo*
px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo); tfree(pColumnInfo);
tfree(schema);
} }
uint64_t qId = 0; uint64_t qId = 0;
...@@ -1259,7 +1369,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -1259,7 +1369,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd) { SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd) {
return pCmd->active; return pCmd->active;
} }
...@@ -2897,7 +3007,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -2897,7 +3007,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
return NULL; return NULL;
} }
SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex]; STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex];
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
...@@ -3093,29 +3203,15 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3093,29 +3203,15 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
} }
static void doRetrieveSubqueryData(SSchedMsg *pMsg) { void doRetrieveSubqueryData(SSchedMsg *pMsg) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle); SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle);
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle); tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle);
return; return;
} }
int32_t numOfRows = 0; SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, &pSql->res);
SSqlObj* pSub = pSql->pSubs[i];
/*TAOS_ROW row = */taos_fetch_row(pSub);
// SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
// int32_t rows = taos_fetch_block(pSub, &row);
if (numOfRows == 0) {
numOfRows = pSub->res.numOfRows;
}
}
if (numOfRows > 0) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlRes* list[2] = {&pSql->pSubs[0]->res, &pSql->pSubs[1]->res};
handleDownstreamOperator(list, 2, pQueryInfo, &pSql->res);
}
pSql->res.qId = -1; pSql->res.qId = -1;
if (pSql->res.code == TSDB_CODE_SUCCESS) { if (pSql->res.code == TSDB_CODE_SUCCESS) {
...@@ -3133,7 +3229,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3133,7 +3229,7 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = tres; SSqlObj* pSql = tres;
if (!subAndCheckDone(pSql, pParentSql, ps->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, ps->subqueryIndex)) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d freed, not all subquery finished", pParentSql->self, pSql->self, ps->subqueryIndex); tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, ps->subqueryIndex);
return; return;
} }
...@@ -4047,10 +4143,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4047,10 +4143,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
tscExprAssign(&pQueryAttr->pExpr2[i], p); tscExprAssign(&pQueryAttr->pExpr2[i], p);
} }
} }
// int32_t code = createProjectionExpr(pQueryInfo, pTableMetaInfo, &pQueryAttr->pExpr2, &pQueryAttr->numOfExpr2);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// tag column info // tag column info
int32_t code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo); int32_t code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo);
......
...@@ -4858,7 +4858,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { ...@@ -4858,7 +4858,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
} }
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) {
break; break;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册