提交 9277029d 编写于 作者: H Haojun Liao

[td-3186]

上级 e6e42400
......@@ -327,7 +327,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -454,7 +454,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSqlRes* pOutput);
void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......
......@@ -7192,7 +7192,9 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
STableMetaVgroupInfo p = {.pTableMeta = pMeta,};
taosHashPut(pCmd->pTableMetaMap, name, strlen(name), &p, sizeof(STableMetaVgroupInfo));
const char* px = tNameGetTableName(pname);
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
} else {// add to the retrieve table meta array list.
char* t = strdup(name);
taosArrayPush(plist, &t);
......
......@@ -1637,7 +1637,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
list[i] = *pExprInfo;
}
pQueryInfo->pQInfo = createQueryInfoFromQueryNode(pQueryInfo, list, &tableGroupInfo, NULL, NULL, pRes->pLocalMerger, MERGE_STAGE);
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, list, &tableGroupInfo, NULL, NULL, pRes->pLocalMerger, MERGE_STAGE);
}
uint64_t localQueryId = 0;
......
......@@ -3483,10 +3483,11 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData;
}
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
// todo remove pExprs
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) {
assert(pQueryInfo != NULL);
int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
// int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
......@@ -3505,13 +3506,25 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
pQueryAttr->tableGroupInfo = *pTableGroupInfo;
// calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].base.resBytes > 0);
pQueryAttr->resultRowSize += pExprs[col].base.resBytes;
SExprInfo* pei = NULL;
int32_t num = 0;
if (pQueryAttr->pExpr3 != NULL) {
pei = pQueryAttr->pExpr3;
num = pQueryAttr->numOfExpr3;
} else if (pQueryAttr->pExpr2 != NULL) {
pei = pQueryAttr->pExpr2;
num = pQueryAttr->numOfExpr2;
} else {
pei = pQueryAttr->pExpr1;
num = pQueryAttr->numOfOutput;
}
for (int16_t col = 0; col < num; ++col) {
pQueryAttr->resultRowSize += pei[col].base.resBytes;
// keep the tag length
if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) {
pQueryAttr->tagLen += pExprs[col].base.resBytes;
if (TSDB_COL_IS_TAG(pei[col].base.colInfo.flag)) {
pQueryAttr->tagLen += pei[col].base.resBytes;
}
}
......@@ -3569,15 +3582,15 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
}
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExprs[i];
if (pExprInfo->pExpr != NULL) {
tExprTreeDestroy(pExprInfo->pExpr, NULL);
pExprInfo->pExpr = NULL;
}
}
tfree(pExprs);
// for (int32_t i = 0; i < numOfOutput; ++i) {
// SExprInfo* pExprInfo = &pExprs[i];
// if (pExprInfo->pExpr != NULL) {
// tExprTreeDestroy(pExprInfo->pExpr, NULL);
// pExprInfo->pExpr = NULL;
// }
// }
//
// tfree(pExprs);
createFilterInfo(pQueryAttr, 0);
......
......@@ -651,8 +651,16 @@ typedef struct SDummyInputInfo {
SSqlRes *pRes; // refactor: remove it
} SDummyInputInfo;
typedef struct SJoinStatus {
SSDataBlock* pBlock; // point to the upstream block
int32_t index;
bool completed;// current upstream is completed or not
} SJoinStatus;
typedef struct SJoinOperatorInfo {
int32_t a;
SSDataBlock *pRes;
SJoinStatus *status;
int32_t numOfUpstream;
} SJoinOperatorInfo;
SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
......@@ -685,6 +693,64 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
return pBlock;
}
SSDataBlock* doBlockJoin(void* param, bool* newgroup) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
assert(pOperator->numOfUpstream > 1);
SSDataBlock* block0 = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
SSDataBlock* block1 = pOperator->upstream[1]->exec(pOperator->upstream[1], newgroup);
if (block1 == NULL || block0 == NULL) {
return NULL;
}
assert(block0 != block1);
SJoinOperatorInfo* pJoinInfo = pOperator->info;
pJoinInfo->status[0].pBlock = block0;
pJoinInfo->status[1].pBlock = block1;
SJoinStatus* st0 = &pJoinInfo->status[0];
SJoinStatus* st1 = &pJoinInfo->status[1];
while (st0->index < st0->pBlock->info.rows && st1->index < st1->pBlock->info.rows) {
SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0);
SColumnInfoData* p1 = taosArrayGet(st1->pBlock->pDataBlock, 0);
int64_t* ts0 = (int64_t*) p0->pData;
int64_t* ts1 = (int64_t*) p1->pData;
if (ts0[st0->index] == ts1[st1->index]) { // add to the final result buffer
// check if current output buffer is over the threshold to pause current loop
int32_t rows = pJoinInfo->pRes->info.rows;
for(int32_t j = 0; j < st0->pBlock->info.numOfCols; ++j) {
SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j);
SColumnInfoData* pSrc = taosArrayGet(st0->pBlock->pDataBlock, j);
int32_t bytes = pSrc->info.bytes;
memcpy(pCol1->pData + rows * bytes, pSrc->pData + st0->index * bytes, bytes);
}
for(int32_t j = 0; j < st1->pBlock->info.numOfCols; ++j) {
SColumnInfoData* pCol1 = taosArrayGet(pJoinInfo->pRes->pDataBlock, j + st0->pBlock->info.numOfCols);
SColumnInfoData* pSrc = taosArrayGet(st1->pBlock->pDataBlock, j);
int32_t bytes = pSrc->info.bytes;
memcpy(pCol1->pData + rows * bytes, pSrc->pData + st1->index * bytes, bytes);
}
st0->index++;
st1->index++;
pJoinInfo->pRes->info.rows++;
} else if (ts0[st0->index] < ts1[st1->index]) {
st0->index++;
} else {
st1->index++;
}
}
return pJoinInfo->pRes;
}
static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
SDummyInputInfo* pInfo = (SDummyInputInfo*) param;
......@@ -728,34 +794,36 @@ SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t
return pOptr;
}
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SExprInfo* pExprInfo, int32_t numOfOutput) {
SJoinInfo* pInfo = calloc(1, sizeof(SJoinInfo));
/*
pInfo->pRes = (SSqlRes*) pResult;
pInfo->block = calloc(numOfCols, sizeof(SSDataBlock));
pInfo->block->info.numOfCols = numOfCols;
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput) {
SJoinOperatorInfo* pInfo = calloc(1, sizeof(SJoinOperatorInfo));
pInfo->numOfUpstream = numOfUpstream;
pInfo->status = calloc(numOfUpstream, sizeof(SJoinStatus));
pInfo->block->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->pRes = calloc(1, sizeof(SSDataBlock));
pInfo->pRes->info.numOfCols = numOfOutput;
pInfo->pRes->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData colData = {{0}};
colData.info.bytes = pSchema[i].bytes;
colData.info.type = pSchema[i].type;
colData.info.colId = pSchema[i].colId;
colData.pData = calloc(1, colData.info.bytes * 4096);
taosArrayPush(pInfo->block->pDataBlock, &colData);
taosArrayPush(pInfo->pRes->pDataBlock, &colData);
}
*/
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "JoinOperator";
pOptr->operatorType = OP_Join;
pOptr->numOfOutput = numOfOutput;
pOptr->blockingOptr = false;
pOptr->info = pInfo;
pOptr->exec = doGetDataBlock;
pOptr->exec = doBlockJoin;
pOptr->cleanup = destroyDummyInputOperator;
for(int32_t i = 0; i < numOfUpstream; ++i) {
appendUpstream(pOptr, pUpstream[0]);
appendUpstream(pOptr, pUpstream[i]);
}
return pOptr;
......@@ -775,7 +843,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->completed = (pRes->numOfRows == 0);
}
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* px, SSqlRes* pOutput) {
void handleDownstreamOperator(SSqlRes** pRes, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) {
// handle the following query process
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
......@@ -805,15 +873,44 @@ void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* px, SSqlRes* pOutput) {
taosArrayPush(tableGroupInfo.pGroupList, &group);
// if it is a join query, create join operator here
SOperatorInfo* pSourceOperator = createDummyInputOperator((char*)pRes, pSchema, numOfCols);
int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns;
SOperatorInfo* pSourceOperator = createDummyInputOperator((char*)pRes[0], pSchema, numOfCol1);
SSchema* schema = NULL;
if (px->numOfTables > 1) {
SOperatorInfo* p[2] = {0};
p[0] = pSourceOperator;
SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[1]->pTableMeta);
numOfCol1 = px->pTableMetaInfo[1]->pTableMeta->tableInfo.numOfColumns;
SOperatorInfo* pSourceOperator1 = createDummyInputOperator((char*)pRes[1], pSchema1, numOfCol1);
p[1] = pSourceOperator1;
pSourceOperator = createJoinOperator(&pSourceOperator, 1, NULL, pSourceOperator->numOfOutput);
int32_t num = pSourceOperator->numOfOutput + pSourceOperator1->numOfOutput;
schema = calloc(num, sizeof(SSchema));
memcpy(&schema[0], pSchema, pSourceOperator->numOfOutput * sizeof(SSchema));
memcpy(&schema[pSourceOperator->numOfOutput], pSchema1, pSourceOperator1->numOfOutput * sizeof(SSchema));
pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num);
}
SExprInfo* exprInfo = NULL;
/*int32_t code = */ createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
px->pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pex = taosArrayGetP(px->exprList, i);
int32_t colId = pex->base.colInfo.colId;
for(int32_t j = 0; j < pSourceOperator->numOfOutput; ++j) {
if (colId == schema[j].colId) {
pex->base.colInfo.colIndex = j;
break;
}
}
}
px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo);
}
......@@ -3011,11 +3108,10 @@ static void doRetrieveSubqueryData(SSchedMsg *pMsg) {
if (numOfRows > 0) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlObj *pSub = pSql->pSubs[0];
handleDownstreamOperator(&pSub->res, pQueryInfo, &pSql->res);
SSqlRes* list[2] = {&pSql->pSubs[0]->res, &pSql->pSubs[1]->res};
handleDownstreamOperator(list, 2, pQueryInfo, &pSql->res);
}
// int32_t code = pSql->res.code;
pSql->res.qId = -1;
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
......
......@@ -521,7 +521,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SExprInfo* pExprInfo, int32_t numOfOutput);
SOperatorInfo* createJoinOperator(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -1750,7 +1750,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
SOperatorInfo* prev = pRuntimeEnv->proot;
if (i == 0) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput) { // TODO refactor
if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor
setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot);
}
} else {
......@@ -3929,11 +3929,11 @@ void queryCostStatis(SQInfo *pQInfo) {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
if (p->upstream == NULL) {
assert(p->numOfOutput == 0);
assert(p->numOfUpstream == 0);
}
p->upstream = realloc(p->upstream, POINTER_BYTES * (p->numOfOutput + 1));
p->upstream[p->numOfOutput++] = pUpstream;
p->upstream = realloc(p->upstream, POINTER_BYTES * (p->numOfUpstream + 1));
p->upstream[p->numOfUpstream++] = pUpstream;
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
......@@ -4824,7 +4824,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) {
assert(*newgroup == false);
......@@ -4878,7 +4878,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while (1) {
pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -4949,7 +4949,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) {
break;
}
......@@ -5219,7 +5219,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
}
while(1) {
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (*newgroup) {
assert(pBlock != NULL);
}
......@@ -5295,7 +5295,15 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
pOperator->cleanup(pOperator->info, pOperator->numOfOutput);
}
destroyOperatorInfo(pOperator->upstream[0]);
if (pOperator->upstream != NULL) {
for(int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
destroyOperatorInfo(pOperator->upstream[i]);
}
tfree(pOperator->upstream);
pOperator->numOfUpstream = 0;
}
tfree(pOperator->info);
tfree(pOperator);
}
......@@ -5844,7 +5852,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
pBlock = pOperator->upstream[0]->exec(pOperator->upstream, newgroup);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -6458,7 +6466,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
return TSDB_CODE_SUCCESS;
}
// TODO tag length should be passed from client
// TODO tag length should be passed from client, refactor
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg) {
*pExprInfo = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册