提交 688f6c3a 编写于 作者: H Haojun Liao

[td-225] fix error in join

上级 995eb63d
......@@ -46,7 +46,8 @@ int doAsyncParseSql(SSqlObj* pSql) {
int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to malloc payload");
tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
tscQueueAsyncRes(pSql);
// tscQueueAsyncRes(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return code;
}
......@@ -211,7 +212,9 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscQueueAsyncRes(pSql);
// tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_QHANDLE);
return;
}
......
......@@ -4949,25 +4949,25 @@ int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {
return TSDB_CODE_SUCCESS;
}
void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex) {
// the first column not timestamp column, add it
SSqlExpr* pExpr = NULL;
if (tscSqlExprNumOfExprs(pQueryInfo) > 0) {
pExpr = tscSqlExprGet(pQueryInfo, 0);
}
if (pExpr == NULL || pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX || pExpr->functionId != functionId) {
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscSqlExprInsert(pQueryInfo, 0, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false);
pExpr->colInfo.flag = TSDB_COL_NORMAL;
// NOTE: tag column does not add to source column list
SColumnList ids = getColumnList(1, tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts", pExpr);
}
}
//void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex) {
// // the first column not timestamp column, add it
// SSqlExpr* pExpr = NULL;
// if (tscSqlExprNumOfExprs(pQueryInfo) > 0) {
// pExpr = tscSqlExprGet(pQueryInfo, 0);
// }
//
// if (pExpr == NULL || pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX || pExpr->functionId != functionId) {
// SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
//
// pExpr = tscSqlExprInsert(pQueryInfo, 0, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false);
// pExpr->colInfo.flag = TSDB_COL_NORMAL;
//
// // NOTE: tag column does not add to source column list
// SColumnList ids = getColumnList(1, tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX);
//
// insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts", pExpr);
// }
//}
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex) {
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentObj->cmd, subClauseIndex);
......
......@@ -800,6 +800,27 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
// serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
}
}
if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
*pMsg = 0;
pMsg++;
} else {
strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
}
// compressed ts block
pQueryMsg->tsOffset = htonl(pMsg - pStart);
int32_t tsLen = 0;
......@@ -824,27 +845,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
}
// serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->uid);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
}
}
if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
*pMsg = 0;
pMsg++;
} else {
strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
}
int32_t msgLen = pMsg - pStart;
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
......
......@@ -339,6 +339,17 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pExpr->numOfParams = 1;
}
SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(pExpr->functionId != TSDB_FUNC_TS || pExpr->functionId != TSDB_FUNC_TS_DUMMY)) {
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_PRJ, &index, s, 0);
tscPrintSelectClause(pNew, 0);
tscFieldInfoUpdateOffset(pNewQueryInfo);
}
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
......@@ -464,8 +475,8 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
}
int32_t tscCompareTidTags(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) p1;
const STidTags* t2 = (const STidTags*) p2;
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
if (t1->vgId != t2->vgId) {
return (t1->vgId > t2->vgId) ? 1 : -1;
......@@ -477,33 +488,33 @@ int32_t tscCompareTidTags(const void* p1, const void* p2) {
}
void tscBuildVgroupTableInfo(STableMetaInfo* pTableMetaInfo, SArray* tables) {
SArray* result = taosArrayInit( 4, sizeof(SVgroupTableInfo) );
SArray* result = taosArrayInit(4, sizeof(SVgroupTableInfo));
SArray* vgTables = NULL;
STidTags* prev = NULL;
size_t numOfTables = taosArrayGetSize( tables );
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
size_t numOfTables = taosArrayGetSize(tables);
for (size_t i = 0; i < numOfTables; i++) {
STidTags* tt = taosArrayGet(tables, i);
if( prev == NULL || tt->vgId != prev->vgId ) {
if (prev == NULL || tt->vgId != prev->vgId) {
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
SVgroupTableInfo info = {{ 0 }};
for( int32_t m = 0; m < pvg->numOfVgroups; ++m ) {
if( tt->vgId == pvg->vgroups[m].vgId ) {
SVgroupTableInfo info = {{0}};
for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (tt->vgId == pvg->vgroups[m].vgId) {
info.vgInfo = pvg->vgroups[m];
break;
}
}
assert( info.vgInfo.numOfIps != 0 );
assert(info.vgInfo.numOfIps != 0);
vgTables = taosArrayInit( 4, sizeof(STableIdInfo) );
vgTables = taosArrayInit(4, sizeof(STableIdInfo));
info.itemList = vgTables;
taosArrayPush( result, &info );
taosArrayPush(result, &info);
}
STableIdInfo item = { .uid = tt->uid, .tid = tt->tid, .key = INT64_MIN };
taosArrayPush( vgTables, &item );
STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
taosArrayPush(vgTables, &item);
prev = tt;
}
......@@ -574,6 +585,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
// response of tag retrieve
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
//todo handle error
if (numOfRows == 0 || pSql->res.completed) {
if (numOfRows > 0) {
......@@ -616,8 +629,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
int32_t i = 0, j = 0;
while(i < p1->num && j < p2->num) {
STidTags* pp1 = (STidTags*) p1->pIdTagList + i * p1->tagSize;
STidTags* pp2 = (STidTags*) p2->pIdTagList + j * p2->tagSize;
STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
if (ret == 0) {
......@@ -660,6 +673,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
} else {
size_t length = pSupporter->totalLen + pSql->res.rspLen;
assert(length > 0);
char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL);
......@@ -833,7 +848,10 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
}
}
} else { // has reach the limitation, no data anymore
if (pRes->row >= pRes->numOfRows) {
hasData = false;
break;
}
}
}
......@@ -1071,16 +1089,15 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
SSchema s = {0};
SColumnIndex index = {0};
size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
for(int32_t i = 0; i < numOfTags; ++i) {
SColumn* c = taosArrayGetP(pTableMetaInfo->tagColList, i);
index = (SColumnIndex) {.tableIndex = 0, .columnIndex = c->colIndex.columnIndex};
STagCond* pTagCond = &pSupporter->tagCond;
assert(pTagCond->joinInfo.hasJoin);
int32_t tagIndex = tscGetJoinTagColIndexByUid(pTagCond, pTableMetaInfo->pTableMeta->uid);
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
s = pTagSchema[c->colIndex.columnIndex];
SSchema s = pTagSchema[tagIndex];
int16_t bytes = 0;
int16_t type = 0;
......@@ -1091,7 +1108,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
s.type = type;
s.bytes = bytes;
pSupporter->tagSize = s.bytes;
}
// set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
......
......@@ -1922,8 +1922,10 @@ void tscDoQuery(SSqlObj* pSql) {
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->joinInfo.left.uid == uid) {
return pTagCond->joinInfo.left.tagCol;
} else {
} else if (pTagCond->joinInfo.right.uid == uid){
return pTagCond->joinInfo.right.tagCol;
} else {
return -2;
}
}
......
......@@ -1072,6 +1072,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock);
}
if (pRuntimeEnv->pTSBuf != NULL && pQuery->numOfOutput > 1) {
printf("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
......@@ -1089,7 +1093,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// from top to bottom in desc
// from bottom to top in asc order
if (pRuntimeEnv->pTSBuf != NULL) {
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pRuntimeEnv);
qTrace("QInfo:%p process data rows, numOfRows:%d, query order:%d, ts comp order:%d", pQInfo, pDataBlockInfo->rows,
pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order);
}
......@@ -1495,6 +1499,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
static bool isQueryKilled(SQInfo *pQInfo) {
return false;
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -3617,6 +3622,10 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (pQInfo->runtimeEnv.pTSBuf != NULL && pQuery->numOfOutput > 1) {
printf("ffffffffffffffffffffffffff\n");
}
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
int32_t bytes = pQuery->pSelectExpr[col].bytes;
......@@ -5038,7 +5047,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
if (pQueryMsg->numOfCols > 1 && pQueryMsg->tsLen > 0) {
printf("ffffffffffffffff\n");
}
for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) {
SColumnInfo *pColInfo = &pQueryMsg->colList[col];
......@@ -5230,6 +5241,9 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
int16_t tagLen = 0;
if (pQueryMsg->numOfOutput > 1 && pQueryMsg->tsLen > 0) {
printf("ffffffffffffffffffff\n");
}
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i];
pExprs[i].bytes = 0;
......@@ -5638,7 +5652,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) { // open new file to save the result
char *tsBlock = (char *)pQueryMsg + pQueryMsg->tsOffset;
char *tsBlock = (char *) pQueryMsg + pQueryMsg->tsOffset;
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder);
tsBufResetPos(pTSBuf);
......@@ -6119,6 +6133,17 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
int32_t rsize = pExprInfo->bytes;
count = 0;
int16_t bytes = pExprInfo->bytes;
int16_t type = pExprInfo->type;
for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) {
bytes = pQuery->tagColList[i].bytes;
type = pQuery->tagColList[i].type;
break;
}
}
while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pQInfo->tableIndex++;
SGroupItem *item = taosArrayGet(pa, i);
......@@ -6136,9 +6161,6 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
*(int32_t *)output = pQInfo->vgId;
output += sizeof(pQInfo->vgId);
int16_t bytes = pExprInfo->bytes;
int16_t type = pExprInfo->type;
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
char *data = tsdbGetTableName(pQInfo->tsdb, &item->id);
memcpy(output, data, varDataTLen(data));
......
......@@ -258,6 +258,7 @@ sql select count(join_tb1.c3), count(join_tb0.ts) from $tb1 , $tb2 where $ts1 =
$val = 2
if $data00 != $val then
print expect 2, actaul: $data00
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册