未验证 提交 e5a50fe9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2202 from taosdata/feature/tliu

Feature/tliu
......@@ -27,13 +27,15 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseDirectly(SSqlObj* pSql, SSubqueryState* pState);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
#define MEM_BUF_SIZE (1<<20)
#define MEM_BUF_SIZE (1u<<20)
#define TS_COMP_BLOCK_PADDING 0xFFFFFFFF
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512
......@@ -47,14 +49,14 @@ typedef struct STSList {
typedef struct STSRawBlock {
int32_t vnode;
int64_t tag;
tVariant tag;
TSKEY* ts;
int32_t len;
} STSRawBlock;
typedef struct STSElem {
TSKEY ts;
int64_t tag;
tVariant tag;
int32_t vnode;
} STSElem;
......@@ -66,7 +68,7 @@ typedef struct STSCursor {
} STSCursor;
typedef struct STSBlock {
int64_t tag; // tag value
tVariant tag; // tag value
int32_t numOfElem; // number of elements
int32_t compLen; // size after compressed
int32_t padding; // 0xFFFFFFFF by default, after the payload
......@@ -123,7 +125,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
......@@ -134,7 +136,7 @@ void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
......
......@@ -134,6 +134,7 @@ void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionE
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size);
void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src);
void tscFieldInfoUpdateBySqlFunc(SQueryInfo* pQueryInfo);
TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
......
......@@ -53,9 +53,10 @@ typedef struct SMeterMetaInfo {
* 2. keep the vnode index for multi-vnode insertion
*/
int32_t vnodeIndex;
char name[TSDB_METER_ID_LEN + 1]; // table(super table) name
int16_t numOfTags; // total required tags in query, including groupby tags
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
char name[TSDB_METER_ID_LEN + 1]; // table(super table) name
char aliasName[TSDB_METER_ID_LEN + 1]; // alias name
int16_t numOfTags; // total required tags in query, including groupby tags
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
} SMeterMetaInfo;
/* the structure for sql function in select clause */
......@@ -196,7 +197,7 @@ typedef struct SDataBlockList {
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint16_t type; // query/insert/import type
uint32_t type; // query/insert/import type
char slidingTimeUnit;
int64_t etime, stime;
......
此差异已折叠。
......@@ -4335,11 +4335,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
// primary ts must be existed, so no need to check its existance
if (pCtx->order == TSQL_SO_ASC) {
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, input, pCtx->size * TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
} else {
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *d = GET_INPUT_CHAR_INDEX(pCtx, i);
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, d, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE);
}
}
......@@ -4359,7 +4359,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) {
STSBuf *pTSbuf = pInfo->pTSBuf;
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, pData, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, &pCtx->tag, pData, TSDB_KEYSIZE);
SET_VAL(pCtx, pCtx->size, 1);
pResInfo->hasResult = DATA_SET_FLAG;
......
......@@ -82,14 +82,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
// for debug purpose
tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag);
#endif
if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) {
int32_t ret = tVariantCompare(&elem1.tag,&elem2.tag );
if (ret < 0 || (ret == 0 && doCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break;
}
numOfInput1++;
} else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem2.ts, elem1.ts))) {
} else if (ret > 0 || (ret == 0 && doCompare(order, elem2.ts, elem1.ts))) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
break;
}
......@@ -109,8 +109,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
*et = elem1.ts;
}
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;
}
......@@ -219,6 +219,105 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
return false;
}
int32_t tscLaunchSecondPhaseDirectly(SSqlObj* pSql, SSubqueryState* pState) {
/*
* If the columns are not involved in the final select clause, the secondary query will not be launched
* for the subquery.
*/
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
tscTrace("%p start to launch secondary subqueries", pSql);
bool success = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
assert(pSupporter != NULL);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) {
tscDestroyJoinSupporter(pSupporter);
success = false;
break;
}
pSql->pSubs[i] = pNew;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->tsBuf = NULL; // transfer the ownership of timestamp comp-z data to the new created object
// set the second stage sub query for join process
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
if (tscSqlExprNumOfExprs(pQueryInfo) == 0) {
SColumnIndex index = {0};
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_COUNT, &index, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), sizeof(int64_t));
SColumnList columnList = {0};
columnList.num = 1;
columnList.ids[0] = index;
insertResultField(pQueryInfo, 0, &columnList, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, "ts", pExpr);
} else if (tscSqlExprGet(pQueryInfo, 0)->functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
}
tscFieldInfoCalOffset(pQueryInfo);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection.
*/
// fetch the join tag column
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid);
pExpr->param[0].i64Key = tagColIndex;
pExpr->numOfParams = 1;
}
tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pQueryInfo->type,
pQueryInfo->exprsInfo.numOfExprs, pQueryInfo->colList.numOfCols,
pQueryInfo->fieldsInfo.numOfOutputCols, pQueryInfo->pMeterInfo[0]->name);
}
//prepare the subqueries object failed, abort
if (!success) {
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
pSql->numOfSubs, pSql->res.code);
freeSubqueryObj(pSql);
return pSql->res.code;
}
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
}
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
......@@ -869,7 +968,7 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->fileSize += getDataStartOffset();
return pTSBuf;
}
......@@ -964,8 +1063,6 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
//int64_t pos = ftell(pTSBuf->f); //pos not used
fread(buf, infoSize, 1, pTSBuf->f);
// the length value for each vnode is not kept in file, so does not set the length value
......@@ -1005,6 +1102,8 @@ void* tsBufDestory(STSBuf* pTSBuf) {
tfree(pTSBuf->pData);
tfree(pTSBuf->block.payload);
tVariantDestroy(&pTSBuf->block.tag);
fclose(pTSBuf->f);
......@@ -1088,8 +1187,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
UNUSED(r);
/*int64_t r =*/ fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
/*
* format for output data:
......@@ -1098,16 +1196,21 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
*
* both side has the compressed length is used to support load data forwards/backwords.
*/
fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
fwrite(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
fwrite(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f);
}
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
int32_t blockSize = sizeof(pBlock->tag.nType) + sizeof(pBlock->tag.nLen) + pBlock->tag.nLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
pTSBuf->fileSize += blockSize;
pTSBuf->tsData.len = 0;
......@@ -1137,10 +1240,13 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
STSBlock* pBlock = &pTSBuf->block;
// clear the memory buffer
tVariant t = pBlock->tag;
void* tmp = pBlock->payload;
memset(pBlock, 0, sizeof(STSBlock));
pBlock->payload = tmp;
pBlock->tag = t;
if (order == TSQL_SO_DESC) {
/*
* set the right position for the reversed traverse, the reversed traverse is started from
......@@ -1150,11 +1256,26 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
pBlock->compLen = pBlock->padding;
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag.nType) + sizeof(pBlock->tag.nLen) + pBlock->tag.nLen ;
fseek(pTSBuf->f, -offset, SEEK_CUR);
}
fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
// NOTE: mix types tags are not supported
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
char* tp = realloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
assert(tp != NULL);
memset(tp, 0, pBlock->tag.nLen + 1);
pBlock->tag.pz = tp;
fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
fread(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f);
}
fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
......@@ -1212,9 +1333,11 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
return TSDB_CODE_SUCCESS;
}
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) {
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len) {
STSVnodeBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData;
STSList* ptsData = &pTSBuf->tsData;
int32_t tagEqual = 0;
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) {
writeDataToDisk(pTSBuf);
......@@ -1226,15 +1349,18 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData
}
assert(pBlockInfo->info.vnode == vnodeId);
tagEqual = tVariantCompare(&pTSBuf->block.tag, tag);
if (pTSBuf->block.tag != tag && ptsData->len > 0) {
if (tagEqual != 0 && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first
writeDataToDisk(pTSBuf);
} else {
expandBuffer(ptsData, len);
}
pTSBuf->block.tag = tag;
tVariantDestroy(&pTSBuf->block.tag);
tVariantAssign(&pTSBuf->block.tag, tag);
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value
......@@ -1315,7 +1441,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
return 0;
}
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) {
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant* tag) {
bool decomp = false;
int64_t offset = 0;
......@@ -1334,7 +1460,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
return -1;
}
if (pTSBuf->block.tag == tag) {
if (0 == tVariantCompare(&pTSBuf->block.tag, tag)) {
return i;
}
}
......@@ -1513,7 +1639,9 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
elem1.tag = pBlock->tag;
elem1.tag.nType = pBlock->tag.nType;
elem1.tag.nLen = pBlock->tag.nLen;
elem1.tag.pz = pBlock->tag.pz;
return elem1;
}
......@@ -1644,7 +1772,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
return pTSBuf;
}
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) {
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) {
STSElem elem = {.vnode = -1};
if (pTSBuf == NULL) {
......@@ -1723,7 +1851,7 @@ void tsBufDisplay(STSBuf* pTSBuf) {
while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts);
printf("%d-%" PRId64 "\n", elem.vnode, elem.ts);
}
pTSBuf->cur.order = old;
......
......@@ -949,7 +949,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
} else {
*sqlstr = sql;
}
if (*sqlstr == NULL) {
code = TSDB_CODE_INVALID_SQL;
}
return code;
}
......@@ -1290,7 +1294,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
uint16_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT;
uint32_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT;
TSDB_QUERY_SET_TYPE(pQueryInfo->type, type);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
......
......@@ -1136,6 +1136,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
const char* msg5 = "invalid function name";
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
if (isSTable) {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_QUERY);
}
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
int32_t outputIndex = pQueryInfo->exprsInfo.numOfExprs;
......@@ -1996,22 +2000,20 @@ int32_t doGetColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColum
}
int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
pIndex->tableIndex = COLUMN_INDEX_INITIAL_VAL;
if (pTableToken->n == 0) { // only one table and no table name prefix in column name
if (pQueryInfo->numOfTables == 1) {
pIndex->tableIndex = 0;
}
return TSDB_CODE_SUCCESS;
}
pIndex->tableIndex = COLUMN_INDEX_INITIAL_VAL;
char tableName[TSDB_METER_ID_LEN + 1] = {0};
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
extractTableName(pMeterMetaInfo->name, tableName);
if (strncasecmp(tableName, pTableToken->z, pTableToken->n) == 0 && strlen(tableName) == pTableToken->n) {
if (strncasecmp(pMeterMetaInfo->aliasName, pTableToken->z, pTableToken->n) == 0 &&
strlen(pMeterMetaInfo->aliasName) == pTableToken->n) {
pIndex->tableIndex = i;
break;
}
......@@ -3201,10 +3203,10 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum
} else if (pLeftIndex->tableIndex == rightIndex.tableIndex) {
invalidSqlErrMsg(pQueryInfo->msg, msg4);
return false;
} else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) {
} /*else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) {
invalidSqlErrMsg(pQueryInfo->msg, msg6);
return false;
}
}*/
// table to table/ super table to super table are allowed
if (UTIL_METER_IS_SUPERTABLE(pLeftMeterMeta) != UTIL_METER_IS_SUPERTABLE(pRightMeterMeta)) {
......@@ -3655,7 +3657,9 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
}
if (!pCondExpr->tsJoin) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY);
} else {
TSDB_QUERY_UNSET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY);
}
return TSDB_CODE_SUCCESS;
......@@ -5600,7 +5604,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
const char* msg7 = "illegal number of tables in from clause";
const char* msg8 = "too many columns in selection clause";
const char* msg9 = "TWA query requires both the start and end time";
const char* msg10 = "alias name too long";
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -5628,15 +5633,20 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pQuerySql->pSortOrder == NULL);
return doLocalQueryProcess(pQueryInfo, pQuerySql);
}
if (pQuerySql->from->nExpr > 2) { // not allowed more than 2 table join
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
if (pQuerySql->from->nExpr > 2) {
if (pQuerySql->from->nExpr > 4) { // not support more than 2 tables join query
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
// set the timestamp not matched join query
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY);
}
pQueryInfo->command = TSDB_SQL_SELECT;
// set all query tables, which are maybe more than one.
for (int32_t i = 0; i < pQuerySql->from->nExpr; ++i) {
for (int32_t i = 0; i < pQuerySql->from->nExpr; i += 2) {
tVariant* pTableItem = &pQuerySql->from->a[i].pVar;
if (pTableItem->nType != TSDB_DATA_TYPE_BINARY) {
......@@ -5654,7 +5664,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
tscAddEmptyMeterMetaInfo(pQueryInfo);
}
SMeterMetaInfo* pMeterInfo1 = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
SMeterMetaInfo* pMeterInfo1 = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i/2);
SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
if (setMeterID(pMeterInfo1, &t, pSql) != TSDB_CODE_SUCCESS) {
......@@ -5665,9 +5675,17 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
if (code != TSDB_CODE_SUCCESS) {
return code;
}
//set the alias name
tVariant* aliasName = &pQuerySql->from->a[i + 1].pVar;
if (aliasName->nLen > TSDB_METER_NAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg10);
}
tVariantDump(aliasName, pMeterInfo1->aliasName, TSDB_DATA_TYPE_BINARY);
}
assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr);
assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr / 2);
// parse the group by clause in the first place
if (parseGroupbyClause(pQueryInfo, pQuerySql->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
......
......@@ -367,7 +367,7 @@ void tVariantListDestroy(tVariantList *pList) {
free(pList);
}
tVariantList *tVariantListAppendToken(tVariantList *pList, SSQLToken *pAliasToken, uint8_t sortOrder) {
tVariantList *tVariantListAppendToken(tVariantList *pList, SSQLToken *pToken, uint8_t sortOrder) {
if (pList == NULL) {
pList = calloc(1, sizeof(tVariantList));
}
......@@ -376,9 +376,9 @@ tVariantList *tVariantListAppendToken(tVariantList *pList, SSQLToken *pAliasToke
return pList;
}
if (pAliasToken) {
if (pToken) {
tVariant t = {0};
tVariantCreate(&t, pAliasToken);
tVariantCreate(&t, pToken);
tVariantListItem *pItem = &pList->a[pList->nExpr++];
memcpy(pItem, &t, sizeof(tVariant));
......
......@@ -156,7 +156,8 @@ static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
*/
void extractTableName(char* meterId, char* name) {
char* r = skipSegments(meterId, TS_PATH_DELIMITER[0], 2);
copy(name, r, TS_PATH_DELIMITER[0]);
size_t len = copy(name, r, TS_PATH_DELIMITER[0]);
name[len] = 0;
}
SSQLToken extractDBName(char* meterId, char* name) {
......
......@@ -259,8 +259,9 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->rowSize = pMemBuffer[0]->nElemSize;
tscRestoreSQLFunctionForMetricQuery(pQueryInfo);
tscFieldInfoUpdateBySqlFunc(pQueryInfo);
tscFieldInfoCalOffset(pQueryInfo);
if (pReducer->rowSize > pMemBuffer[0]->pageSize) {
assert(false); // todo fixed row size is larger than the minimum page size;
}
......
......@@ -643,7 +643,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pSql->pSubs[pSql->numOfSubs++] = pNew;
assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
......@@ -763,7 +763,7 @@ int tscProcessSql(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = NULL;
int16_t type = 0;
uint32_t type = 0;
if (pQueryInfo != NULL) {
pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -808,26 +808,37 @@ int tscProcessSql(SSqlObj *pSql) {
if (QUERY_IS_JOIN_QUERY(type)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pSql->res.code;
if ((pQueryInfo->type & TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY) != 0) {
pSql->numOfSubs = pQueryInfo->numOfTables;
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES);
if (pSql->pSubs == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
}
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
break;
tscLaunchSecondPhaseDirectly(pSql, pState);
} else {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pSql->res.code;
}
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
break;
}
}
}
......
......@@ -551,9 +551,36 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) &&
(doSetResultRowData(pSql->pSubs[1], false) != NULL);
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
bool s1 = doSetResultRowData(pSql->pSubs[0], false);
bool s2 = doSetResultRowData(pSql->pSubs[1], false);
success = s1 && s2;
if (success) {
TSKEY key1 = *(TSKEY*) pSql->pSubs[0]->res.tsrow[0];
TSKEY key2 = *(TSKEY*) pSql->pSubs[1]->res.tsrow[0];
while(1) {
if (key1 < key2) {
s1 = doSetResultRowData(pSql->pSubs[0], false);
if (!s1) { // retrieve next block
break;
}
} else if (key1 > key2) {
s2 = doSetResultRowData(pSql->pSubs[1], false);
if (!s2) {
break;
}
} else {
break;
}
key1 = *(TSKEY *)pSql->pSubs[0]->res.tsrow[0];
key2 = *(TSKEY *)pSql->pSubs[1]->res.tsrow[0];
}
success = s1 && s2;
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
}
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
......
......@@ -928,6 +928,20 @@ void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionE
pFieldInfo->pExpr[index] = pExpr;
}
void tscFieldInfoUpdateBySqlFunc(SQueryInfo* pQueryInfo) {
for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD* field = tscFieldInfoGetField(pQueryInfo, i);
SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
if (pExpr == NULL) {
continue;
}
field->type = pExpr->resType;
field->bytes = pExpr->resBytes;
}
}
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo) {
SSqlExprInfo* pExprInfo = &pQueryInfo->exprsInfo;
pExprInfo->pExprs[0]->offset = 0;
......@@ -2014,6 +2028,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
// create the fields info from the sql functions
SColumnList columnList = {.num = 0};
// for avg/last/first/histo.. query, the output type is binary not numeric data type
for(int32_t k = 0; k < numOfOutputCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, indexList[k]);
columnList.ids[0] = (SColumnIndex){.tableIndex = tableIndex, .columnIndex = pExpr->colInfo.colIdx};
......
......@@ -406,8 +406,35 @@ as(X) ::= . { X.n = 0; }
from(A) ::= FROM tablelist(X). {A = X;}
%type tablelist {tVariantList*}
tablelist(A) ::= ids(X) cpxName(Y). { toTSDBType(X.type); X.n += Y.n; A = tVariantListAppendToken(NULL, &X, -1);}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z). { toTSDBType(X.type); X.n += Z.n; A = tVariantListAppendToken(Y, &X, -1); }
tablelist(A) ::= ids(X) cpxName(Y). {
toTSDBType(X.type);
X.n += Y.n;
A = tVariantListAppendToken(NULL, &X, -1);
A = tVariantListAppendToken(A, &X, -1);
}
tablelist(A) ::= ids(X) cpxName(Y) ids(Z). {
toTSDBType(X.type);
toTSDBType(Z.type);
X.n += Y.n;
A = tVariantListAppendToken(NULL, &X, -1);
A = tVariantListAppendToken(A, &Z, -1);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z). {
toTSDBType(X.type);
X.n += Z.n;
A = tVariantListAppendToken(Y, &X, -1);
A = tVariantListAppendToken(A, &X, -1);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). {
toTSDBType(X.type);
toTSDBType(F.type);
X.n += Z.n;
A = tVariantListAppendToken(Y, &X, -1);
A = tVariantListAppendToken(A, &F, -1);
}
// The value of interval should be the form of "number+[a,s,m,h,d,n,y]" or "now"
%type tmvar {SSQLToken}
......
......@@ -230,10 +230,12 @@ extern "C" {
#define TSDB_QUERY_TYPE_INSERT 0x100U // insert type
#define TSDB_QUERY_TYPE_IMPORT 0x200U // import data
#define TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY 0x400u // join query without ts match
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSDB_QUERY_UNSET_TYPE(x, _type) ((x) &= ~(_type))
#define TSQL_SO_ASC 1
#define TSQL_SO_DESC 0
......
......@@ -115,7 +115,7 @@ enum {
};
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (((type)&TSDB_QUERY_TYPE_JOIN_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (((type)&(TSDB_QUERY_TYPE_JOIN_QUERY|TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY)) != 0)
#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0)
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
......
......@@ -75,6 +75,8 @@ void tVariantDestroy(tVariant *pV);
void tVariantAssign(tVariant *pDst, const tVariant *pSrc);
int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc);
int32_t tVariantToString(tVariant *pVar, char *dst);
int32_t tVariantDump(tVariant *pVariant, char *payload, char type);
......
......@@ -198,7 +198,7 @@ typedef struct SMeterQueryInfo {
int64_t ekey;
int32_t numOfRes;
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag;
tVariant tag;
STSCursor cur;
int32_t sid; // for retrieve the page id list
......
......@@ -2559,7 +2559,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
// compare tag first
if (pCtx[0].tag.i64Key != elem.tag) {
if (0 != tVariantCompare(&pCtx[0].tag,&elem.tag)) {
return TS_JOIN_TAG_NOT_EQUALS;
}
......@@ -2667,7 +2667,6 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
int32_t j = 0;
TSKEY lastKey = -1;
int32_t lastIndex = -1;
//bool firstAccessedPoint = true;
for (j = 0; j < (*forwardStep); ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, step);
......@@ -7455,9 +7454,10 @@ int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, S
// both the master and supplement scan needs to set the correct ts comp start position
if (pRuntimeEnv->pTSBuf != NULL) {
if (pMeterQueryInfo->cur.vnodeIndex == -1) {
pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
//pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
tVariantAssign(&pMeterQueryInfo->tag,&pRuntimeEnv->pCtx[0].tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pMeterQueryInfo->tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pMeterQueryInfo->tag);
// keep the cursor info of current meter
pMeterQueryInfo->cur = pRuntimeEnv->pTSBuf->cur;
......
......@@ -500,8 +500,9 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vnodeIndex == -1) {
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key;
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag);
tVariant tag = {0};
tVariantAssign(&tag, &pRuntimeEnv->pCtx[0].tag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &tag);
// failed to find data with the specified tag value
if (elem.vnode < 0) {
......
......@@ -114,27 +114,33 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
pVar->i64Key = GET_INT8_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
pVar->i64Key = GET_INT16_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_INT: {
pVar->i64Key = GET_INT32_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
pVar->i64Key = GET_INT64_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
pVar->dKey = GET_DOUBLE_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
pVar->dKey = GET_FLOAT_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
......@@ -181,9 +187,39 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
}
pDst->pz = calloc(1, len);
printf("==============alloc assign:%p", pDst->pz);
memcpy(pDst->pz, pSrc->pz, len);
}
}
/* compare two tVariant, if same, return 0; else return nonezero */
int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return 1;
if (pSrc->nType != pDst->nType) return 1;
switch (pSrc->nType) {
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
if (pSrc->i64Key > pDst->i64Key){
return 1;
}else if (pSrc->i64Key < pDst->i64Key) {
return -1;
}else {
return 0;
}
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return strncasecmp(pSrc->pz,pDst->pz,pSrc->nLen);
default:
return 0;
}
}
int32_t tVariantToString(tVariant *pVar, char *dst) {
if (pVar == NULL || dst == NULL) return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册