提交 c59d762a 编写于 作者: H hjxilinx

[td-186] fix bugs for stable join query.

上级 41d414ea
......@@ -29,8 +29,8 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
......
......@@ -51,7 +51,14 @@ typedef struct SParsedDataColInfo {
bool hasVal[TSDB_MAX_COLUMNS];
} SParsedDataColInfo;
typedef struct SJoinSubquerySupporter {
typedef struct STidTags {
int64_t uid;
int32_t tid;
int32_t vgId;
char tag[];
} STidTags;
typedef struct SJoinSupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
......@@ -65,8 +72,17 @@ typedef struct SJoinSubquerySupporter {
SSqlGroupbyExpr groupbyExpr;
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path
} SJoinSubquerySupporter;
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
int32_t tagSize; // the length of each in the first filter stage
char* pIdTagList; // result of first stage tags
int32_t totalLen;
int32_t num;
} SJoinSupporter;
typedef struct SVgroupTableInfo {
SCMVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
} SVgroupTableInfo;
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
......@@ -87,7 +103,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks);
UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
//UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/**
*
......@@ -190,7 +206,6 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb);
void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldBeFreed(SSqlObj* pSql);
void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
......
......@@ -67,9 +67,10 @@ typedef struct STableMeta {
} STableMeta;
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
STableMeta * pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
......@@ -382,7 +383,6 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscResetSqlCmdObj(SSqlCmd *pCmd);
void tscFreeResData(SSqlObj *pSql);
/**
* free query result of the sql object
......
......@@ -153,7 +153,7 @@ typedef struct SRateInfo {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) {
int16_t *bytes, int16_t *interResBytes, int16_t extLength, bool isSuperTable) {
if (!isValidDataType(dataType, dataBytes)) {
tscError("Illegal data type %d or data type length %d", dataType, dataBytes);
return TSDB_CODE_INVALID_SQL;
......@@ -164,28 +164,35 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = *bytes + sizeof(SResultInfo);
*interResBytes = *bytes + sizeof(SResultInfo);
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY;
*bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); // (uid, tid) + VGID + TAGSIZE
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_COUNT) {
*type = TSDB_DATA_TYPE_BIGINT;
*bytes = sizeof(int64_t);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_ARITHM) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_TS_COMP) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(int32_t); // this results is compressed ts data
*intermediateResBytes = POINTER_BYTES;
*interResBytes = POINTER_BYTES;
return TSDB_CODE_SUCCESS;
}
......@@ -193,54 +200,54 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = dataBytes + DATA_SET_FLAG_SIZE;
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SUM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SSumInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_AVG) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SAvgInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(SRateInfo);
*intermediateResBytes = sizeof(SRateInfo);
*interResBytes = sizeof(SRateInfo);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SPREAD) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SSpreadInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SLastrowInfo) + dataBytes;
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(STwaInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
}
......@@ -253,57 +260,57 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
}
*bytes = sizeof(int64_t);
*intermediateResBytes = sizeof(SSumInfo);
*interResBytes = sizeof(SSumInfo);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes =
*interResBytes =
sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(STwaInfo);
*interResBytes = sizeof(STwaInfo);
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_AVG) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SAvgInfo);
*interResBytes = sizeof(SAvgInfo);
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SRateInfo);
*interResBytes = sizeof(SRateInfo);
} else if (functionId == TSDB_FUNC_STDDEV) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SStddevInfo);
*interResBytes = sizeof(SStddevInfo);
} else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = dataBytes + DATA_SET_FLAG_SIZE;
*interResBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = dataBytes + sizeof(SResultInfo);
*interResBytes = dataBytes + sizeof(SResultInfo);
} else if (functionId == TSDB_FUNC_SPREAD) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SSpreadInfo);
*interResBytes = sizeof(SSpreadInfo);
} else if (functionId == TSDB_FUNC_PERCT) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = (int16_t)sizeof(double);
*intermediateResBytes = (int16_t)sizeof(double);
*interResBytes = (int16_t)sizeof(double);
} else if (functionId == TSDB_FUNC_LEASTSQR) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string
*intermediateResBytes = *bytes + sizeof(SResultInfo);
*interResBytes = *bytes + sizeof(SResultInfo);
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = dataBytes + sizeof(SFirstLastInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
......@@ -311,11 +318,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo)
*intermediateResBytes = size;
*interResBytes = size;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = dataBytes + sizeof(SLastrowInfo);
*interResBytes = dataBytes + sizeof(SLastrowInfo);
} else {
return TSDB_CODE_INVALID_SQL;
}
......@@ -4836,7 +4843,7 @@ SQLAggFuncElem aAggs[] = {{
"apercentile",
TSDB_FUNC_APERCT,
TSDB_FUNC_APERCT,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE,
apercentile_function_setup,
apercentile_function,
apercentile_function_f,
......@@ -4881,7 +4888,7 @@ SQLAggFuncElem aAggs[] = {{
"last_row",
TSDB_FUNC_LAST_ROW,
TSDB_FUNC_LAST_ROW,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_row_function,
......@@ -4897,7 +4904,7 @@ SQLAggFuncElem aAggs[] = {{
"top",
TSDB_FUNC_TOP,
TSDB_FUNC_TOP,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
top_function,
......@@ -4913,7 +4920,7 @@ SQLAggFuncElem aAggs[] = {{
"bottom",
TSDB_FUNC_BOTTOM,
TSDB_FUNC_BOTTOM,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
bottom_function,
......@@ -5079,7 +5086,7 @@ SQLAggFuncElem aAggs[] = {{
"arithmetic",
TSDB_FUNC_ARITHM,
TSDB_FUNC_ARITHM,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
function_setup,
arithmetic_function,
arithmetic_function_f,
......@@ -5140,7 +5147,7 @@ SQLAggFuncElem aAggs[] = {{
"interp",
TSDB_FUNC_INTERP,
TSDB_FUNC_INTERP,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
function_setup,
interp_function,
do_sum_f, // todo filter handle
......@@ -5239,4 +5246,19 @@ SQLAggFuncElem aAggs[] = {{
sumrate_func_merge,
sumrate_func_second_merge,
data_req_load_info,
},
{
// 34
"tid_tag", // return table id and the corresponding tags for join match
TSDB_FUNC_TID_TAG,
TSDB_FUNC_TID_TAG,
TSDB_FUNCSTATE_MO,
function_setup,
noop1,
noop2,
no_next_step,
noop1,
noop1,
noop1,
data_req_load_info,
}};
......@@ -296,17 +296,17 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
break;
}
case TSDB_SQL_CREATE_DNODE: { // todo parse hostname
const char* msg = "invalid ip address";
case TSDB_SQL_CREATE_DNODE: { // todo hostname
const char* msg = "invalid host name (ip address)";
if (pInfo->pDCLInfo->nTokens > 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
SSQLToken* pIpAddr = &pInfo->pDCLInfo->a[0];
if (!validateIpAddress(pIpAddr->z, pIpAddr->n)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
// SSQLToken* pIpAddr = &pInfo->pDCLInfo->a[0];
// if (!validateIpAddress(pIpAddr->z, pIpAddr->n)) {
// return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
// }
break;
}
......@@ -1325,7 +1325,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY;
} else {
index.columnIndex = colIndex;
pQueryInfo->type = TSDB_QUERY_TYPE_PROJECTION_QUERY;
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
}
return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes,
......@@ -2342,7 +2342,7 @@ bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_METRIC) == 0) {
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_STABLE) == 0) {
invalidSqlErrMsg(pQueryInfo->msg, msg3);
return true;
}
......@@ -5111,7 +5111,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) {
doUpdateSqlFunctionForColPrj(pQueryInfo);
}
} else {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) == TSDB_QUERY_TYPE_PROJECTION_QUERY) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) {
if (numOfAggregation > 0 && pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
......@@ -5747,6 +5747,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY);
}
// parse the group by clause in the first place
......
......@@ -83,7 +83,6 @@ STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
return pTableMeta->tableInfo;
}
bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
if (!VALIDNUMOFCOLS(numOfCols)) {
return false;
......@@ -184,7 +183,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
return pTableMeta;
}
/**
* the TableMeta data format in memory is as follows:
*
......
......@@ -45,15 +45,15 @@ void tscSaveSubscriptionProgress(void* sub);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) {
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
SRpcIpSet* pIpList = &pSql->ipList;
pIpList->numOfIps = pTableMeta->vgroupInfo.numOfIps;
pIpList->numOfIps = pVgroupInfo->numOfIps;
pIpList->inUse = 0;
for(int32_t i = 0; i < pTableMeta->vgroupInfo.numOfIps; ++i) {
strcpy(pIpList->fqdn[i], pTableMeta->vgroupInfo.ipAddr[i].fqdn);
pIpList->port[i] = pTableMeta->vgroupInfo.ipAddr[i].port;
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
}
}
......@@ -198,7 +198,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
if (pSql->cmd.command < TSDB_SQL_MGMT) {
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port[0]);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
......@@ -341,7 +341,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
}
}
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
......@@ -559,7 +559,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeIpList(pSql, pTableMeta);
tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps);
return TSDB_CODE_SUCCESS;
......@@ -589,18 +589,65 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
return size;
}
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) {
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) {
SCMVgroupInfo* pVgroupInfo = NULL;
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
pVgroupInfo = &pTableMeta->vgroupInfo;
} else {
int32_t index = pTableMetaInfo->vgroupIndex;
assert(index >= 0);
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
}
tscSetDnodeIpList(pSql, pVgroupInfo);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo);
} else {
int32_t index = pTableMetaInfo->vgroupIndex;
int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups);
tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info
tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
// serialize each table id info
for(int32_t i = 0; i < numOfTables; ++i) {
STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pItem->tid);
pTableIdInfo->uid = htobe64(pItem->uid);
// pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
pMsg += sizeof(STableIdInfo);
}
}
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableMeta->sid);
pTableIdInfo->uid = htobe64(pTableMeta->uid);
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid));
tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
pTableMeta->uid);
pMsg += sizeof(STableIdInfo);
return pMsg;
}
......@@ -637,38 +684,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
int32_t msgLen = 0;
int32_t numOfTables = 0;
int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
numOfTables = 1;
tscSetDnodeIpList(pSql, pTableMeta);
pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
tscTrace("%p queried tables:%d, table name: %s", pSql, 1, pTableMetaInfo->name);
} else { // query super table
int32_t index = pTableMetaInfo->vgroupIndex;
if (index < 0) {
tscError("%p error vgroupIndex:%d", pSql, index);
return -1;
}
SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me
pSql->ipList.inUse = 0;
int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
strcpy(pSql->ipList.fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
pSql->ipList.port[i] = pVgroupInfo->ipAddr[i].port;
}
tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
numOfTables = 1;
}
if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
......@@ -677,7 +694,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey);
}
pQueryMsg->numOfTables = htonl(numOfTables);
pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType);
......@@ -782,10 +798,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
}
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
......@@ -891,8 +907,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
}
// tbname in/like query expression should be sent to mgmt node
msgLen = pMsg - pStart;
int32_t msgLen = pMsg - pStart;
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen;
......
......@@ -14,10 +14,12 @@
*/
#include "tscSubquery.h"
#include <tcompare.h>
#include <tschemautil.h>
#include "os.h"
#include "qtsbuf.h"
#include "tsclient.h"
#include "tscLog.h"
#include "tsclient.h"
typedef struct SInsertSupporter {
SSubqueryState* pState;
......@@ -27,7 +29,7 @@ typedef struct SInsertSupporter {
static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql);
static bool doCompare(int32_t order, int64_t left, int64_t right) {
static bool tsCompare(int32_t order, int64_t left, int64_t right) {
if (order == TSDB_ORDER_ASC) {
return left < right;
} else {
......@@ -35,8 +37,8 @@ static bool doCompare(int32_t order, int64_t left, int64_t right) {
}
}
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1,
SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) {
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1,
SJoinSupporter* pSupporter2, TSKEY* st, TSKEY* et) {
STSBuf* output1 = tsBufCreate(true);
STSBuf* output2 = tsBufCreate(true);
......@@ -82,17 +84,16 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
#ifdef _DEBUG_VIEW
// for debug purpose
tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag);
#endif
if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) {
if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break;
}
numOfInput1++;
} else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem2.ts, elem1.ts))) {
} else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem2.ts, elem1.ts))) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
break;
}
......@@ -156,8 +157,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
}
// todo handle failed to create sub query
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) {
SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter));
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) {
SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
if (pSupporter == NULL) {
return NULL;
}
......@@ -185,7 +186,7 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
return pSupporter;
}
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
if (pSupporter == NULL) {
return;
}
......@@ -234,14 +235,12 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
*/
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSubquerySupporter* pSupporter = NULL;
SJoinSupporter* pSupporter = NULL;
/*
* If the columns are not involved in the final select clause,
* the corresponding query will not be issued.
*/
SSubqueryState* pState = NULL;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (taosArrayGetSize(pSupporter->exprList) > 0) {
......@@ -256,7 +255,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
"select clause", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
pState = pSupporter->pState;
SSubqueryState* pState = pSupporter->pState;
pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = (pSql->numOfSubs - numOfSub);
......@@ -377,7 +376,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
continue;
}
SJoinSubquerySupporter* p = pSub->param;
SJoinSupporter* p = pSub->param;
pState = p->pState;
tscDestroyJoinSupporter(p);
......@@ -391,14 +390,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
pSql->numOfSubs = 0;
}
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
pSqlObj->res.code = pSupporter->pState->code;
if (finished >= numOfTotal) {
pSqlObj->res.code = abs(pSupporter->pState->code);
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
freeJoinSubqueryObj(pSqlObj);
}
}
......@@ -411,35 +409,36 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et)
pQueryInfo->window.ekey = et;
}
static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter, SSqlObj* pSql) {
static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
// SSqlCmd* pCmd = &pSql->cmd;
// SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0;
if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// assert(pQueryInfo->numOfTables == 1);
//
// // for projection query, need to try next vnode
//// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// int32_t totalVnode = 0;
// if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
// tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
// pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
//
// pSql->cmd.command = TSDB_SQL_SELECT;
// pSql->fp = tscJoinQueryCallback;
// tscProcessSql(pSql);
//
// return;
// }
// }
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
printf("---------------------------------%d, total:%d\n", finished, numOfTotal);
if (finished >= numOfTotal) {
assert(finished == numOfTotal);
......@@ -453,11 +452,10 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter,
tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql);
SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param;
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
TSKEY st, et;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et);
if (num <= 0) { // no result during ts intersect
tscTrace("%p free all sub SqlObj and quit", pParentSql);
......@@ -469,8 +467,134 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter,
}
}
int32_t tagsOrderCompar(const void* p1, const void* p2) {
STidTags* t1 = (STidTags*) p1;
STidTags* t2 = (STidTags*) p2;
if (t1->vgId != t2->vgId) {
return (t1->vgId > t2->vgId)? 1:-1;
} else {
if (t1->tid != t2->tid) {
return (t1->tid > t2->tid)? 1:-1;
} else {
return 0;
}
}
}
static void doBuildVgroupTableInfo(SArray* res, STableMetaInfo* pTableMetaInfo) {
SArray* pGroup = taosArrayInit(4, sizeof(SVgroupTableInfo));
SArray* vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo));
int32_t size = taosArrayGetSize(res);
STidTags* prev = taosArrayGet(res, 0);
int32_t prevVgId = prev->vgId;
STableIdInfo item = {.uid = prev->uid, .tid = prev->tid, .key = INT64_MIN};
taosArrayPush(vgTableIdItem, &item);
for(int32_t k = 1; k < size; ++k) {
STidTags* t1 = taosArrayGet(res, k);
if (prevVgId != t1->vgId) {
SVgroupTableInfo info = {0};
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
for(int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (prevVgId == pvg->vgroups[m].vgId) {
info.vgInfo = pvg->vgroups[m];
break;
}
}
assert(info.vgInfo.numOfIps != 0);
info.itemList = vgTableIdItem;
taosArrayPush(pGroup, &info);
vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo));
STableIdInfo item1 = {.uid = t1->uid, .tid = t1->tid, .key = INT64_MIN};
taosArrayPush(vgTableIdItem, &item1);
prevVgId = t1->vgId;
} else {
taosArrayPush(vgTableIdItem, &item);
}
}
if (taosArrayGetSize(vgTableIdItem) > 0) {
SVgroupTableInfo info = {0};
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
for(int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (prevVgId == pvg->vgroups[m].vgId) {
info.vgInfo = pvg->vgroups[m];
break;
}
}
assert(info.vgInfo.numOfIps != 0);
info.itemList = vgTableIdItem;
taosArrayPush(pGroup, &info);
}
pTableMetaInfo->pVgroupTables = pGroup;
}
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
SSqlCmd* pCmd = &pSql->cmd;
tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscInitQueryInfo(pQueryInfo);
TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
pCmd->command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1;
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pQueryInfo->colList, &p);
}
}
}
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscTrace(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, tscSqlExprNumOfExprs(pQueryInfo),
numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
tscProcessSql(pSql);
}
static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param;
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pSql = (SSqlObj*)tres;
......@@ -478,21 +602,134 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows,
pSupporter->pState->code);
quitAllSubquery(pParentSql, pSupporter);
return;
// if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
// tscError("%p abort query due to other subquery failure. code:%d, global code:%s", pSql, numOfRows,
// tstrerror(pSupporter->pState->code));
//
// quitAllSubquery(pParentSql, pSupporter);
// return;
// }
//
// if (numOfRows < 0) {
// tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
// pSupporter->pState->code = numOfRows;
// quitAllSubquery(pParentSql, pSupporter);
// return;
// }
// response of tag retrieve
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
if (numOfRows == 0 || pSql->res.completed) {
if (numOfRows > 0) {
size_t length = pSupporter->totalLen + pSql->res.rspLen;
char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL);
pSupporter->pIdTagList = tmp;
memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen);
pSupporter->totalLen += pSql->res.rspLen;
pSupporter->num += pSql->res.numOfRows;
}
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished < numOfTotal) {
return;
}
// all subqueries are returned, start to compare the tags
assert(finished == numOfTotal);
tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagsOrderCompar);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagsOrderCompar);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
SArray* s1 = taosArrayInit(p1->num, p1->tagSize);
SArray* s2 = taosArrayInit(p2->num, p2->tagSize);
int32_t i = 0, j = 0;
while(i < p1->num && j < p2->num) {
STidTags* pp1 = (STidTags*) p1->pIdTagList + i * p1->tagSize;
STidTags* pp2 = (STidTags*) p2->pIdTagList + j * p2->tagSize;
int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
if (ret == 0) {
taosArrayPush(s1, pp1);
taosArrayPush(s2, pp2);
j++;
i++;
} else if (ret > 0) {
j++;
} else {
i++;
}
}
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {// no results,return.
tscTrace("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
return;
} else {
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
doBuildVgroupTableInfo(s1, pTableMetaInfo1);
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
doBuildVgroupTableInfo(s2, pTableMetaInfo2);
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->code = 0;
pSupporter->pState->numOfTotal = 2;
for(int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
SSqlObj* psub = pParentSql->pSubs[m];
issueTSCompQuery(psub, psub->param, pParentSql);
}
}
} else {
size_t length = pSupporter->totalLen + pSql->res.rspLen;
char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL);
pSupporter->pIdTagList = tmp;
memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen);
pSupporter->totalLen += pSql->res.rspLen;
pSupporter->num += pSql->res.numOfRows;
// continue retrieve data from vnode
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
}
if (numOfRows < 0) {
tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
return;
} else if (numOfRows == 0) {
return;
}
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
if (numOfRows < 0) {
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
return;
}
if (numOfRows == 0) {
tSIntersectionAndLaunchSecQuery(pSupporter, pSql);
return;
}
......@@ -590,16 +827,16 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
}
}
static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
int32_t notInvolved = 0;
SJoinSubquerySupporter* pSupporter = NULL;
SJoinSupporter* pSupporter = NULL;
SSubqueryState* pState = NULL;
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
notInvolved++;
} else {
pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param;
pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param;
pState = pSupporter->pState;
}
}
......@@ -666,7 +903,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
// TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch);
SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch);
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
......@@ -677,7 +914,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd;
pSupporter = (SJoinSubquerySupporter*)pSql1->param;
pSupporter = (SJoinSupporter*)pSql1->param;
// wait for all subqueries completed
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
......@@ -746,13 +983,13 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param;
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code
joinRetrieveCallback(param, pSql, code);
} else { // first stage query, continue to retrieve compressed time stamp data
......@@ -817,7 +1054,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) {
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -875,42 +1112,78 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscInitQueryInfo(pNewQueryInfo);
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1;
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // return the tableId & tag
SSchema s = {0};
SColumnIndex index = {0};
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn* p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
}
size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
for(int32_t i = 0; i < numOfTags; ++i) {
SColumn* c = taosArrayGetP(pTableMetaInfo->tagColList, i);
index = (SColumnIndex) {.tableIndex = 0, .columnIndex = c->colIndex.columnIndex};
SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
s = pTagSchema[c->colIndex.columnIndex];
int16_t bytes = 0;
int16_t type = 0;
int16_t inter = 0;
getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0);
s.type = type;
s.bytes = bytes;
pSupporter->tagSize = s.bytes;
}
}
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
// set get tags query type
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
tscSqlExprNumOfExprs(pNewQueryInfo), numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
tscTrace(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
} else {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1;
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn *p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
}
}
}
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
}
} else {
assert(0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
......@@ -930,7 +1203,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
......@@ -1620,7 +1893,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
continue;
}
SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param;
SJoinSupporter *pSupporter = (SJoinSupporter *)pChildObj->param;
pState = pSupporter->pState;
tscDestroyJoinSupporter(pChildObj->param);
......
......@@ -76,7 +76,9 @@ bool tscQueryOnSTable(SSqlCmd* pCmd) {
bool tscQueryTags(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
if (tscSqlExprGet(pQueryInfo, i)->functionId != TSDB_FUNC_TAGPRJ) {
int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG) {
return false;
}
}
......@@ -123,23 +125,23 @@ void tscGetDBInfoFromMeterId(char* tableId, char* db) {
db[0] = 0;
}
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
if (pSidList == NULL) {
tscError("illegal sidlist");
return 0;
}
if (idx < 0 || idx >= pSidList->numOfSids) {
int32_t sidRange = (pSidList->numOfSids > 0) ? (pSidList->numOfSids - 1) : 0;
tscError("illegal sidIdx:%d, reset to 0, sidIdx range:%d-%d", idx, 0, sidRange);
idx = 0;
}
assert(pSidList->pSidExtInfoList[idx] >= 0);
return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
}
//STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
// if (pSidList == NULL) {
// tscError("illegal sidlist");
// return 0;
// }
//
// if (idx < 0 || idx >= pSidList->numOfSids) {
// int32_t sidRange = (pSidList->numOfSids > 0) ? (pSidList->numOfSids - 1) : 0;
//
// tscError("illegal sidIdx:%d, reset to 0, sidIdx range:%d-%d", idx, 0, sidRange);
// idx = 0;
// }
//
// assert(pSidList->pSidExtInfoList[idx] >= 0);
//
// return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList);
//}
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) {
......@@ -919,7 +921,6 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
pExpr->functionId = functionId;
// set the correct column index
......@@ -1596,6 +1597,35 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
}
}
void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) {
if (index < 0 || index >= pQueryInfo->numOfTables) {
return;
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo);
int32_t after = pQueryInfo->numOfTables - index - 1;
if (after > 0) {
memmove(&pQueryInfo->pTableMetaInfo[index], &pQueryInfo->pTableMetaInfo[index + 1], after * POINTER_BYTES);
}
pQueryInfo->numOfTables -= 1;
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
int32_t index = pQueryInfo->numOfTables;
while (index >= 0) {
doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache);
}
tfree(pQueryInfo->pTableMetaInfo);
}
void tscFreeQueryInfo(SSqlCmd* pCmd) {
if (pCmd == NULL || pCmd->numOfClause == 0) {
return;
......@@ -1606,7 +1636,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
freeQueryInfoImpl(pQueryInfo);
tscClearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
tfree(pQueryInfo);
}
......@@ -1657,35 +1687,6 @@ STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) {
return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL);
}
void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) {
if (index < 0 || index >= pQueryInfo->numOfTables) {
return;
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo);
int32_t after = pQueryInfo->numOfTables - index - 1;
if (after > 0) {
memmove(&pQueryInfo->pTableMetaInfo[index], &pQueryInfo->pTableMetaInfo[index + 1], after * POINTER_BYTES);
}
pQueryInfo->numOfTables -= 1;
}
void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
int32_t index = pQueryInfo->numOfTables;
while (index >= 0) {
doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache);
}
tfree(pQueryInfo->pTableMetaInfo);
}
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
if (pTableMetaInfo == NULL) {
return;
......
......@@ -23,12 +23,12 @@ typedef void* qinfo_t;
/**
* create the qinfo object according to QueryTableMsg
* @param pVnode
* @param tsdb
* @param pQueryTableMsg
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
/**
* Destroy QInfo object
......
......@@ -298,9 +298,11 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_IMPORT 0x200u // import data
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x800u
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSDB_ORDER_ASC 1
......
......@@ -422,8 +422,8 @@ typedef struct SColumnInfo {
} SColumnInfo;
typedef struct STableIdInfo {
int32_t sid;
int64_t uid;
int32_t tid;
TSKEY key; // last accessed ts, for subscription
} STableIdInfo;
......@@ -459,9 +459,6 @@ typedef struct {
int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list
// int32_t colNameLen;
// int64_t colNameList;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
......@@ -638,34 +635,6 @@ typedef struct {
SCMVgroupInfo vgroups[];
} SVgroupsInfo;
//typedef struct {
// int32_t numOfTables;
//// int32_t numOfVgroups;
//// SCMVgroupInfo vgroups[];
//} SCMSTableVgroupRspMsg;
//typedef struct {
// int16_t elemLen;
//
// char tableId[TSDB_TABLE_ID_LEN + 1];
// int16_t orderIndex;
// int16_t orderType; // used in group by xx order by xxx
//
// int16_t rel; // denotes the relation between condition and table list
//
// int32_t tableCond; // offset value of table name condition
// int32_t tableCondLen;
//
// int32_t cond; // offset of column query condition
// int32_t condLen;
//
// int16_t tagCols[TSDB_MAX_TAGS + 1]; // required tag columns, plus one is for table name
// int16_t numOfTags; // required number of tags
//
// int16_t numOfGroupCols; // num of group by columns
// int32_t groupbyTagColumnList;
//} SSuperTableMetaElemMsg;
//
//typedef struct {
// int32_t numOfTables;
// int32_t join;
......@@ -673,20 +642,6 @@ typedef struct {
// int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
//} SSuperTableMetaMsg;
typedef struct {
int32_t nodeId;
uint32_t nodeIp;
uint16_t nodePort;
} SVnodeDesc;
typedef struct {
SVnodeDesc vpeerDesc[TSDB_MAX_REPLICA_NUM];
int16_t index; // used locally
int32_t vgId;
int32_t numOfSids;
int32_t pSidExtInfoList[]; // offset value of STableIdInfo
} SVnodeSidList;
typedef struct STableMetaMsg {
int32_t contLen;
char tableId[TSDB_TABLE_ID_LEN + 1]; // table id
......
......@@ -1252,8 +1252,10 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
for (int32_t vn = 0; vn < vgItem->numOfVnodes; ++vn) {
SDnodeObj *pDnode = vgItem->vnodeGid[vn].pDnode;
if (pDnode == NULL) break;
strncpy(pVgroup->vgroups[vg].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn));
pVgroup->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort);
pVgroup->vgroups[vg].numOfIps++;
}
......
......@@ -51,7 +51,7 @@ typedef struct SDiskbasedResultBuf {
* @param rowSize
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize);
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle);
/**
*
......
......@@ -169,6 +169,7 @@ typedef struct SQInfo {
int32_t code; // error code to returned to client
sem_t dataReady;
void* tsdb;
int32_t vgId;
STableGroupInfo tableIdGroupInfo; // table id list < only includes the STableId list>
STableGroupInfo groupInfo; //
......@@ -185,7 +186,6 @@ typedef struct SQInfo {
*/
int32_t tableIndex;
int32_t numOfGroupResultPages;
TSKEY* tsList;
} SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -162,7 +162,7 @@ ifnotexists(X) ::= . {X.n = 0;}
/////////////////////////////////THE CREATE STATEMENT///////////////////////////////////////
//create option for dnode/db/user/account
cmd ::= CREATE DNODE IPTOKEN(X). { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &X);}
cmd ::= CREATE DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &X);}
cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z).
{ setCreateAcctSQL(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);}
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDBSQL(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
......
......@@ -68,16 +68,18 @@ extern "C" {
#define TSDB_FUNC_AVG_RATE 32
#define TSDB_FUNC_AVG_IRATE 33
#define TSDB_FUNCSTATE_SO 0x1U // single output
#define TSDB_FUNCSTATE_MO 0x2U // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
#define TSDB_FUNCSTATE_STREAM 0x4U // function avail for stream
#define TSDB_FUNCSTATE_METRIC 0x8U // function avail for metric
#define TSDB_FUNCSTATE_OF 0x10U // outer forward
#define TSDB_FUNCSTATE_NEED_TS 0x20U // timestamp is required during query processing
#define TSDB_FUNCSTATE_SELECTIVITY 0x40U // selectivity functions, can exists along with tag columns
#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
#define TSDB_FUNC_TID_TAG 34
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
#define TSDB_FUNCSTATE_STREAM 0x4u // function avail for stream
#define TSDB_FUNCSTATE_STABLE 0x8u // function avail for metric
#define TSDB_FUNCSTATE_OF 0x10u // outer forward
#define TSDB_FUNCSTATE_NEED_TS 0x20u // timestamp is required during query processing
#define TSDB_FUNCSTATE_SELECTIVITY 0x40u // selectivity functions, can exists along with tag columns
#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF
#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF
#define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16
......
......@@ -7,7 +7,7 @@
#define DEFAULT_INTERN_BUF_SIZE 16384L
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize) {
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) {
SDiskbasedResultBuf* pResBuf = calloc(1, sizeof(SDiskbasedResultBuf));
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize;
pResBuf->numOfPages = size;
......@@ -41,11 +41,13 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0);
if (pResBuf->pBuf == MAP_FAILED) {
qError("QInfo:%p failed to map temp file: %s. %s", pResBuf->path, strerror(errno));
qError("QInfo:%p failed to map temp file: %s. %s", handle, pResBuf->path, strerror(errno));
return TSDB_CODE_CLI_OUT_OF_MEMORY; // todo change error code
}
qTrace("create tmp file for output result, %s, " PRId64 "bytes", pResBuf->path, pResBuf->totalBufSize);
qTrace("QInfo:%p create tmp file for output result, %s, %" PRId64 "bytes", handle, pResBuf->path,
pResBuf->totalBufSize);
*pResultBuf = pResBuf;
return TSDB_CODE_SUCCESS;
}
......
......@@ -1651,7 +1651,7 @@ static bool needReverseScan(SQuery *pQuery) {
static bool onlyQueryTags(SQuery* pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId != TSDB_FUNC_TAGPRJ) {
if (functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TID_TAG) {
return false;
}
}
......@@ -4210,7 +4210,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
return true;
}
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) {
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -4238,6 +4238,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery)
}
pQInfo->tsdb = tsdb;
pQInfo->vgId = vgId;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = param;
......@@ -4259,7 +4260,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery)
if (isSTableQuery) {
int32_t rows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4278,7 +4279,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery)
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
int32_t rows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -5267,11 +5268,11 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId));
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->tid = htonl(pTableIdInfo->tid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->sid};
STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->tid};
taosArrayPush(*pTableIdList, &id);
pMsg += sizeof(STableIdInfo);
......@@ -5279,7 +5280,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) {
pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->tid = htonl(pTableIdInfo->tid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key);
......@@ -5479,9 +5480,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
strcpy(*tbnameCond, pMsg);
pMsg += len;
}
qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, "
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset);
......@@ -5882,7 +5883,7 @@ static bool isValidQInfo(void *param) {
static void freeQInfo(SQInfo *pQInfo);
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo, bool isSTable) {
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) {
int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -5900,14 +5901,14 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo,
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
qTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
pQuery->window.ekey, pQuery->order.order);
setQueryStatus(pQuery, QUERY_COMPLETED);
sem_post(&pQInfo->dataReady);
setQueryStatus(pQuery, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS;
}
// filter the qualified
if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, isSTable)) != TSDB_CODE_SUCCESS) {
if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -6066,7 +6067,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// todo if interpolation exists, the result may be dump to client by several rounds
}
int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) {
int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) {
assert(pQueryMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -6106,8 +6107,15 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
bool isSTableQuery = false;
STableGroupInfo groupInfo = {0};
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) {
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
STableId *id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true;
STableId *id = taosArrayGet(pTableIdList, 0);
......@@ -6125,12 +6133,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
goto _query_over;
}
} else {
assert(taosArrayGetSize(pTableIdList) == 1);
STableId *id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
assert(0);
}
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo);
......@@ -6138,7 +6141,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
}
code = initQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery);
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
_query_over:
tfree(tagCond);
......@@ -6147,7 +6150,7 @@ _query_over:
// if failed to add ref for all meters in this query, abort current query
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
return TSDB_CODE_SUCCESS;
return code;
}
void qDestroyQueryInfo(qinfo_t pQInfo) {
......@@ -6273,31 +6276,60 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
assert(num == 1); // only one group
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
num = taosArrayGetSize(pa);
assert(num == pQInfo->groupInfo.numOfTables);
assert(num == pQInfo->groupInfo.numOfTables);
int16_t type, bytes;
for(int32_t i = 0; i < num; ++i) {
SExprInfo* pExprInfo = pQuery->pSelectExpr;
SGroupItem* item = taosArrayGet(pa, i);
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
int32_t rsize = pExprInfo->bytes;
char* data = NULL;
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
// todo check the return value
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
tsdbGetTableName(pQInfo->tsdb, &item->id, &data);
strncpy(pQuery->sdata[j]->data + i * TSDB_TABLE_NAME_LEN, data, TSDB_TABLE_NAME_LEN);
tfree(data);
for(int32_t i = 0; i < num; ++i) {
SGroupItem* item = taosArrayGet(pa, i);
char* output = pQuery->sdata[0]->data + i * rsize;
*(int64_t*) output = item->id.uid; // memory align problem
output += sizeof(item->id.uid);
*(int32_t*) output = item->id.tid;
output += sizeof(item->id.tid);
*(int32_t*) output = pQInfo->vgId;
output += sizeof(pQInfo->vgId);
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data);
memcpy(output, data, bytes);
}
} else {
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type);
memcpy(pQuery->sdata[j]->data + i * bytes, data, bytes);
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", num);
} else { // return only the tags|table name etc.
for(int32_t i = 0; i < num; ++i) {
SExprInfo* pExprInfo = pQuery->pSelectExpr;
SGroupItem* item = taosArrayGet(pa, i);
char* data = NULL;
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
// todo check the return value
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
tsdbGetTableName(pQInfo->tsdb, &item->id, &data);
strncpy(pQuery->sdata[j]->data + i * TSDB_TABLE_NAME_LEN, data, TSDB_TABLE_NAME_LEN);
tfree(data);
} else {
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type);
memcpy(pQuery->sdata[j]->data + i * bytes, data, bytes);
}
}
}
qTrace("QInfo:%p create tag values results completed, rows:%d", num);
}
pQuery->rec.rows = num;
......
......@@ -27,7 +27,6 @@
#include <assert.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "qsqlparser.h"
......@@ -203,61 +202,62 @@ typedef union {
** yy_default[] Default action for each state.
**
*********** Begin parsing tables **********************************************/
#define YY_ACTTAB_COUNT (529)
#define YY_ACTTAB_COUNT (531)
static const YYACTIONTYPE yy_action[] = {
/* 0 */ 752, 440, 132, 150, 244, 10, 616, 246, 132, 441,
/* 10 */ 132, 155, 821, 41, 43, 20, 35, 36, 820, 154,
/* 20 */ 821, 29, 741, 440, 200, 39, 37, 40, 38, 131,
/* 30 */ 499, 441, 96, 34, 33, 100, 151, 32, 31, 30,
/* 40 */ 41, 43, 741, 35, 36, 152, 136, 163, 29, 727,
/* 50 */ 749, 200, 39, 37, 40, 38, 185, 100, 225, 224,
/* 60 */ 34, 33, 162, 730, 32, 31, 30, 400, 401, 402,
/* 0 */ 752, 440, 133, 151, 244, 10, 616, 246, 133, 441,
/* 10 */ 133, 156, 821, 41, 43, 20, 35, 36, 820, 155,
/* 20 */ 821, 29, 741, 440, 201, 39, 37, 40, 38, 132,
/* 30 */ 499, 441, 97, 34, 33, 101, 152, 32, 31, 30,
/* 40 */ 41, 43, 741, 35, 36, 153, 137, 164, 29, 727,
/* 50 */ 749, 201, 39, 37, 40, 38, 186, 101, 225, 224,
/* 60 */ 34, 33, 163, 730, 32, 31, 30, 400, 401, 402,
/* 70 */ 403, 404, 405, 406, 407, 408, 409, 410, 411, 245,
/* 80 */ 730, 41, 43, 188, 35, 36, 215, 236, 197, 29,
/* 90 */ 58, 20, 200, 39, 37, 40, 38, 32, 31, 30,
/* 100 */ 56, 34, 33, 75, 730, 32, 31, 30, 43, 236,
/* 110 */ 35, 36, 776, 817, 195, 29, 20, 20, 200, 39,
/* 120 */ 37, 40, 38, 164, 570, 727, 227, 34, 33, 440,
/* 130 */ 167, 32, 31, 30, 238, 35, 36, 441, 7, 816,
/* 140 */ 29, 61, 110, 200, 39, 37, 40, 38, 223, 228,
/* 80 */ 730, 41, 43, 189, 35, 36, 216, 236, 198, 29,
/* 90 */ 58, 20, 201, 39, 37, 40, 38, 32, 31, 30,
/* 100 */ 56, 34, 33, 76, 730, 32, 31, 30, 43, 236,
/* 110 */ 35, 36, 776, 817, 196, 29, 20, 20, 201, 39,
/* 120 */ 37, 40, 38, 165, 570, 727, 227, 34, 33, 440,
/* 130 */ 168, 32, 31, 30, 238, 35, 36, 441, 7, 816,
/* 140 */ 29, 61, 111, 201, 39, 37, 40, 38, 223, 228,
/* 150 */ 727, 727, 34, 33, 50, 728, 32, 31, 30, 15,
/* 160 */ 214, 237, 213, 212, 211, 210, 209, 208, 207, 206,
/* 160 */ 215, 237, 214, 213, 212, 211, 210, 209, 208, 207,
/* 170 */ 712, 51, 701, 702, 703, 704, 705, 706, 707, 708,
/* 180 */ 709, 710, 711, 159, 583, 11, 815, 574, 100, 577,
/* 190 */ 100, 580, 168, 159, 583, 222, 221, 574, 16, 577,
/* 200 */ 20, 580, 34, 33, 145, 26, 32, 31, 30, 238,
/* 210 */ 86, 85, 139, 174, 657, 156, 157, 123, 144, 199,
/* 220 */ 182, 715, 179, 714, 148, 156, 157, 159, 583, 531,
/* 230 */ 60, 574, 149, 577, 726, 580, 237, 16, 39, 37,
/* 180 */ 709, 710, 711, 160, 583, 11, 815, 574, 101, 577,
/* 190 */ 101, 580, 169, 160, 583, 222, 221, 574, 16, 577,
/* 200 */ 20, 580, 34, 33, 146, 26, 32, 31, 30, 238,
/* 210 */ 87, 86, 140, 175, 657, 157, 158, 124, 145, 200,
/* 220 */ 183, 715, 180, 714, 149, 157, 158, 160, 583, 531,
/* 230 */ 60, 574, 150, 577, 726, 580, 237, 16, 39, 37,
/* 240 */ 40, 38, 27, 775, 26, 59, 34, 33, 551, 552,
/* 250 */ 32, 31, 30, 137, 113, 114, 219, 64, 67, 156,
/* 260 */ 157, 95, 515, 666, 184, 512, 123, 513, 26, 514,
/* 270 */ 523, 147, 127, 125, 240, 88, 87, 187, 42, 158,
/* 280 */ 73, 77, 239, 84, 76, 572, 528, 729, 42, 582,
/* 290 */ 79, 17, 658, 165, 166, 123, 243, 242, 92, 582,
/* 250 */ 32, 31, 30, 138, 114, 115, 68, 64, 67, 157,
/* 260 */ 158, 96, 515, 666, 185, 512, 124, 513, 26, 514,
/* 270 */ 523, 148, 128, 126, 240, 89, 88, 188, 42, 159,
/* 280 */ 74, 78, 239, 85, 77, 572, 528, 729, 42, 582,
/* 290 */ 80, 17, 658, 166, 167, 124, 243, 242, 93, 582,
/* 300 */ 47, 542, 543, 600, 581, 45, 13, 12, 584, 576,
/* 310 */ 138, 579, 12, 575, 581, 578, 2, 72, 71, 48,
/* 320 */ 505, 573, 42, 743, 45, 504, 204, 9, 8, 21,
/* 330 */ 21, 140, 519, 582, 520, 517, 141, 518, 83, 82,
/* 340 */ 142, 143, 134, 130, 135, 830, 133, 786, 581, 785,
/* 350 */ 160, 782, 781, 161, 751, 721, 768, 226, 97, 767,
/* 360 */ 111, 112, 516, 668, 205, 109, 128, 24, 218, 220,
/* 370 */ 829, 69, 26, 828, 826, 115, 186, 686, 25, 22,
/* 380 */ 90, 129, 655, 78, 653, 80, 651, 650, 169, 538,
/* 390 */ 124, 648, 189, 647, 646, 644, 636, 193, 52, 740,
/* 400 */ 126, 642, 640, 638, 49, 755, 756, 101, 769, 44,
/* 410 */ 198, 196, 194, 28, 192, 190, 217, 74, 229, 230,
/* 420 */ 202, 232, 231, 614, 233, 234, 53, 235, 241, 170,
/* 430 */ 146, 62, 171, 65, 173, 172, 613, 176, 175, 178,
/* 440 */ 649, 177, 612, 89, 91, 117, 687, 118, 116, 119,
/* 450 */ 120, 643, 104, 102, 122, 725, 106, 103, 105, 121,
/* 460 */ 107, 1, 108, 23, 180, 181, 605, 183, 187, 525,
/* 470 */ 55, 539, 153, 98, 57, 191, 18, 63, 4, 544,
/* 480 */ 99, 5, 585, 3, 19, 14, 201, 6, 203, 480,
/* 490 */ 479, 478, 477, 476, 475, 474, 473, 471, 45, 444,
/* 500 */ 66, 446, 21, 501, 216, 68, 500, 498, 54, 465,
/* 510 */ 46, 463, 455, 70, 461, 457, 459, 453, 451, 472,
/* 520 */ 470, 81, 426, 442, 93, 415, 94, 413, 618,
/* 310 */ 139, 579, 12, 575, 581, 578, 2, 73, 72, 48,
/* 320 */ 505, 573, 42, 743, 45, 504, 205, 9, 8, 21,
/* 330 */ 21, 141, 519, 582, 520, 517, 142, 518, 84, 83,
/* 340 */ 143, 144, 135, 131, 136, 830, 134, 786, 581, 785,
/* 350 */ 161, 782, 781, 162, 751, 721, 768, 226, 98, 767,
/* 360 */ 112, 113, 516, 668, 206, 110, 129, 24, 219, 665,
/* 370 */ 220, 829, 26, 70, 828, 826, 187, 116, 686, 25,
/* 380 */ 91, 22, 130, 655, 79, 653, 81, 651, 650, 538,
/* 390 */ 170, 125, 190, 648, 647, 646, 644, 194, 52, 740,
/* 400 */ 636, 127, 642, 640, 638, 49, 755, 102, 756, 44,
/* 410 */ 769, 199, 197, 195, 193, 191, 28, 218, 75, 229,
/* 420 */ 230, 231, 232, 233, 234, 235, 203, 53, 241, 614,
/* 430 */ 171, 172, 147, 62, 65, 174, 613, 177, 173, 179,
/* 440 */ 612, 176, 649, 178, 181, 643, 123, 687, 117, 119,
/* 450 */ 118, 120, 121, 90, 103, 725, 108, 104, 105, 122,
/* 460 */ 106, 107, 109, 92, 1, 23, 182, 188, 605, 184,
/* 470 */ 525, 55, 539, 57, 99, 154, 192, 18, 63, 4,
/* 480 */ 544, 100, 480, 585, 3, 19, 5, 14, 202, 6,
/* 490 */ 204, 479, 478, 477, 476, 475, 474, 473, 471, 45,
/* 500 */ 217, 444, 66, 21, 501, 500, 46, 498, 54, 465,
/* 510 */ 463, 455, 461, 457, 69, 459, 71, 453, 451, 472,
/* 520 */ 470, 82, 426, 442, 94, 415, 413, 618, 617, 617,
/* 530 */ 95,
};
static const YYCODETYPE yy_lookahead[] = {
/* 0 */ 207, 1, 256, 206, 207, 256, 204, 205, 256, 9,
......@@ -300,20 +300,20 @@ static const YYCODETYPE yy_lookahead[] = {
/* 370 */ 207, 207, 103, 207, 207, 207, 240, 207, 207, 207,
/* 380 */ 59, 207, 207, 207, 207, 207, 207, 207, 207, 107,
/* 390 */ 207, 207, 259, 207, 207, 207, 207, 259, 117, 253,
/* 400 */ 207, 207, 207, 207, 119, 208, 208, 252, 208, 116,
/* 410 */ 111, 115, 110, 121, 109, 108, 75, 84, 83, 49,
/* 420 */ 208, 82, 80, 5, 53, 81, 208, 79, 75, 132,
/* 430 */ 208, 212, 5, 212, 58, 132, 5, 5, 132, 58,
/* 440 */ 208, 132, 5, 209, 209, 220, 222, 216, 221, 219,
/* 450 */ 217, 208, 249, 251, 215, 240, 247, 250, 248, 218,
/* 460 */ 246, 213, 245, 210, 132, 58, 86, 124, 104, 97,
/* 470 */ 105, 97, 1, 96, 101, 96, 101, 72, 112, 97,
/* 480 */ 96, 112, 97, 96, 101, 96, 98, 96, 98, 9,
/* 490 */ 5, 5, 5, 5, 1, 5, 5, 5, 101, 76,
/* 500 */ 72, 58, 101, 5, 15, 127, 5, 97, 96, 5,
/* 510 */ 16, 5, 5, 127, 5, 5, 5, 5, 5, 5,
/* 520 */ 5, 58, 58, 76, 21, 59, 21, 58, 0, 267,
/* 530 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267,
/* 400 */ 207, 207, 207, 207, 207, 119, 208, 252, 208, 116,
/* 410 */ 208, 111, 115, 110, 109, 108, 121, 75, 84, 83,
/* 420 */ 49, 80, 82, 53, 81, 79, 208, 208, 75, 5,
/* 430 */ 132, 5, 208, 212, 212, 58, 5, 5, 132, 58,
/* 440 */ 5, 132, 208, 132, 132, 208, 215, 222, 221, 216,
/* 450 */ 220, 219, 217, 209, 251, 240, 246, 250, 249, 218,
/* 460 */ 248, 247, 245, 209, 213, 210, 58, 104, 86, 124,
/* 470 */ 97, 105, 97, 101, 96, 1, 96, 101, 72, 112,
/* 480 */ 97, 96, 9, 97, 96, 101, 112, 96, 98, 96,
/* 490 */ 98, 5, 5, 5, 5, 1, 5, 5, 5, 101,
/* 500 */ 15, 76, 72, 101, 5, 5, 16, 97, 96, 5,
/* 510 */ 5, 5, 5, 5, 127, 5, 127, 5, 5, 5,
/* 520 */ 5, 58, 58, 76, 21, 59, 58, 0, 267, 267,
/* 530 */ 21, 267, 267, 267, 267, 267, 267, 267, 267, 267,
/* 540 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267,
/* 550 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267,
/* 560 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267,
......@@ -333,41 +333,41 @@ static const YYCODETYPE yy_lookahead[] = {
/* 700 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267,
/* 710 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267,
/* 720 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267,
/* 730 */ 267, 267,
/* 730 */ 267, 267, 267, 267,
};
#define YY_SHIFT_COUNT (246)
#define YY_SHIFT_MIN (0)
#define YY_SHIFT_MAX (528)
#define YY_SHIFT_MAX (527)
static const unsigned short int yy_shift_ofst[] = {
/* 0 */ 141, 74, 182, 226, 128, 128, 128, 128, 128, 128,
/* 10 */ 0, 22, 226, 260, 260, 260, 102, 128, 128, 128,
/* 20 */ 128, 128, 31, 149, 9, 9, 529, 192, 226, 226,
/* 20 */ 128, 128, 31, 149, 9, 9, 531, 192, 226, 226,
/* 30 */ 226, 226, 226, 226, 226, 226, 226, 226, 226, 226,
/* 40 */ 226, 226, 226, 226, 226, 260, 260, 25, 25, 25,
/* 50 */ 25, 25, 25, 42, 25, 165, 128, 128, 135, 135,
/* 60 */ 185, 128, 128, 128, 128, 128, 128, 128, 128, 128,
/* 70 */ 128, 128, 128, 128, 128, 128, 128, 128, 128, 128,
/* 80 */ 128, 128, 128, 128, 128, 128, 128, 128, 128, 128,
/* 90 */ 128, 128, 128, 128, 128, 269, 321, 321, 282, 282,
/* 100 */ 321, 281, 285, 293, 299, 296, 302, 305, 307, 292,
/* 110 */ 269, 321, 321, 341, 341, 321, 333, 335, 370, 342,
/* 120 */ 339, 371, 344, 348, 321, 353, 321, 353, 529, 529,
/* 130 */ 27, 68, 68, 68, 94, 119, 213, 213, 213, 216,
/* 140 */ 169, 169, 169, 169, 190, 208, 67, 89, 60, 60,
/* 150 */ 236, 173, 204, 205, 206, 211, 304, 308, 284, 220,
/* 160 */ 199, 53, 223, 228, 229, 327, 330, 191, 201, 266,
/* 170 */ 418, 297, 427, 303, 376, 431, 306, 432, 309, 381,
/* 180 */ 437, 332, 407, 380, 343, 364, 372, 365, 373, 374,
/* 190 */ 377, 471, 379, 382, 384, 375, 366, 383, 369, 385,
/* 200 */ 387, 389, 388, 391, 390, 405, 480, 485, 486, 487,
/* 210 */ 488, 493, 490, 491, 492, 397, 423, 489, 428, 443,
/* 220 */ 494, 378, 386, 401, 498, 501, 410, 412, 401, 504,
/* 230 */ 506, 507, 509, 510, 511, 512, 513, 514, 515, 463,
/* 240 */ 464, 447, 503, 505, 466, 469, 528,
/* 90 */ 128, 128, 128, 128, 128, 128, 269, 321, 321, 282,
/* 100 */ 282, 321, 281, 286, 293, 300, 297, 303, 305, 307,
/* 110 */ 295, 269, 321, 321, 342, 342, 321, 334, 336, 371,
/* 120 */ 341, 340, 370, 343, 346, 321, 353, 321, 353, 531,
/* 130 */ 531, 27, 68, 68, 68, 94, 119, 213, 213, 213,
/* 140 */ 216, 169, 169, 169, 169, 190, 208, 67, 89, 60,
/* 150 */ 60, 236, 173, 204, 205, 206, 211, 304, 308, 284,
/* 160 */ 220, 199, 53, 223, 228, 229, 327, 330, 191, 201,
/* 170 */ 266, 424, 298, 426, 306, 377, 431, 309, 432, 311,
/* 180 */ 381, 435, 312, 408, 382, 345, 363, 373, 366, 372,
/* 190 */ 375, 378, 474, 380, 383, 385, 376, 367, 384, 374,
/* 200 */ 386, 388, 391, 390, 393, 392, 406, 473, 486, 487,
/* 210 */ 488, 489, 494, 491, 492, 493, 398, 425, 485, 430,
/* 220 */ 490, 387, 389, 402, 499, 500, 410, 412, 402, 504,
/* 230 */ 505, 506, 507, 508, 510, 512, 513, 514, 515, 463,
/* 240 */ 464, 447, 503, 509, 466, 468, 527,
};
#define YY_REDUCE_COUNT (129)
#define YY_REDUCE_COUNT (130)
#define YY_REDUCE_MIN (-254)
#define YY_REDUCE_MAX (253)
#define YY_REDUCE_MAX (255)
static const short yy_reduce_ofst[] = {
/* 0 */ -198, -53, -254, -246, -150, -172, -192, -116, -91, -90,
/* 10 */ -207, -203, -248, -179, -162, -138, -218, -175, -19, -17,
......@@ -378,10 +378,11 @@ static const short yy_reduce_ofst[] = {
/* 60 */ 121, 153, 154, 156, 157, 159, 160, 161, 162, 163,
/* 70 */ 164, 166, 167, 168, 170, 171, 172, 174, 175, 176,
/* 80 */ 177, 178, 179, 180, 181, 183, 184, 186, 187, 188,
/* 90 */ 189, 193, 194, 195, 196, 136, 197, 198, 133, 138,
/* 100 */ 200, 146, 155, 202, 207, 203, 210, 209, 214, 217,
/* 110 */ 215, 212, 218, 219, 221, 222, 224, 227, 225, 231,
/* 120 */ 230, 233, 241, 239, 232, 234, 243, 235, 248, 253,
/* 90 */ 189, 193, 194, 195, 196, 197, 136, 198, 200, 133,
/* 100 */ 138, 202, 146, 155, 203, 207, 209, 212, 214, 210,
/* 110 */ 217, 215, 218, 219, 221, 222, 224, 225, 227, 230,
/* 120 */ 233, 232, 235, 241, 231, 234, 244, 237, 254, 251,
/* 130 */ 255,
};
static const YYACTIONTYPE yy_default[] = {
/* 0 */ 615, 667, 823, 823, 615, 615, 615, 615, 615, 615,
......@@ -391,21 +392,21 @@ static const YYACTIONTYPE yy_default[] = {
/* 40 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615,
/* 50 */ 615, 615, 615, 615, 615, 615, 615, 615, 772, 772,
/* 60 */ 746, 615, 615, 615, 615, 615, 615, 615, 615, 615,
/* 70 */ 615, 615, 615, 615, 615, 615, 615, 615, 654, 615,
/* 80 */ 652, 615, 615, 615, 615, 615, 615, 615, 615, 615,
/* 90 */ 615, 615, 641, 615, 615, 615, 635, 635, 615, 615,
/* 100 */ 635, 779, 783, 777, 765, 773, 764, 760, 759, 787,
/* 110 */ 615, 635, 635, 664, 664, 635, 685, 683, 681, 673,
/* 120 */ 679, 675, 677, 671, 635, 662, 635, 662, 700, 713,
/* 130 */ 615, 788, 822, 778, 806, 805, 818, 812, 811, 615,
/* 140 */ 810, 809, 808, 807, 615, 615, 615, 615, 814, 813,
/* 150 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 790,
/* 160 */ 784, 780, 615, 615, 615, 615, 615, 615, 615, 615,
/* 70 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 654,
/* 80 */ 615, 652, 615, 615, 615, 615, 615, 615, 615, 615,
/* 90 */ 615, 615, 615, 641, 615, 615, 615, 635, 635, 615,
/* 100 */ 615, 635, 779, 783, 777, 765, 773, 764, 760, 759,
/* 110 */ 787, 615, 635, 635, 664, 664, 635, 685, 683, 681,
/* 120 */ 673, 679, 675, 677, 671, 635, 662, 635, 662, 700,
/* 130 */ 713, 615, 788, 822, 778, 806, 805, 818, 812, 811,
/* 140 */ 615, 810, 809, 808, 807, 615, 615, 615, 615, 814,
/* 150 */ 813, 615, 615, 615, 615, 615, 615, 615, 615, 615,
/* 160 */ 790, 784, 780, 615, 615, 615, 615, 615, 615, 615,
/* 170 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615,
/* 180 */ 615, 615, 615, 615, 615, 745, 615, 615, 754, 615,
/* 190 */ 615, 615, 615, 615, 615, 774, 615, 766, 615, 615,
/* 200 */ 615, 615, 615, 615, 722, 615, 615, 615, 615, 615,
/* 210 */ 615, 615, 615, 615, 615, 688, 615, 615, 615, 615,
/* 180 */ 615, 615, 615, 615, 615, 615, 745, 615, 615, 754,
/* 190 */ 615, 615, 615, 615, 615, 615, 774, 615, 766, 615,
/* 200 */ 615, 615, 615, 615, 615, 722, 615, 615, 615, 615,
/* 210 */ 615, 615, 615, 615, 615, 615, 688, 615, 615, 615,
/* 220 */ 615, 615, 615, 827, 615, 615, 615, 716, 825, 615,
/* 230 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615,
/* 240 */ 615, 615, 639, 637, 615, 631, 615,
......@@ -1038,7 +1039,7 @@ static const char *const yyRuleName[] = {
/* 44 */ "ifexists ::=",
/* 45 */ "ifnotexists ::= IF NOT EXISTS",
/* 46 */ "ifnotexists ::=",
/* 47 */ "cmd ::= CREATE DNODE IPTOKEN",
/* 47 */ "cmd ::= CREATE DNODE ids",
/* 48 */ "cmd ::= CREATE ACCOUNT ids PASS ids acct_optr",
/* 49 */ "cmd ::= CREATE DATABASE ifnotexists ids db_optr",
/* 50 */ "cmd ::= CREATE USER ids PASS ids",
......@@ -1711,7 +1712,7 @@ static const struct {
{ 209, 0 }, /* (44) ifexists ::= */
{ 212, -3 }, /* (45) ifnotexists ::= IF NOT EXISTS */
{ 212, 0 }, /* (46) ifnotexists ::= */
{ 205, -3 }, /* (47) cmd ::= CREATE DNODE IPTOKEN */
{ 205, -3 }, /* (47) cmd ::= CREATE DNODE ids */
{ 205, -6 }, /* (48) cmd ::= CREATE ACCOUNT ids PASS ids acct_optr */
{ 205, -5 }, /* (49) cmd ::= CREATE DATABASE ifnotexists ids db_optr */
{ 205, -5 }, /* (50) cmd ::= CREATE USER ids PASS ids */
......@@ -2122,7 +2123,7 @@ static void yy_reduce(
case 45: /* ifnotexists ::= IF NOT EXISTS */
{yymsp[-2].minor.yy0.n = 1;}
break;
case 47: /* cmd ::= CREATE DNODE IPTOKEN */
case 47: /* cmd ::= CREATE DNODE ids */
{ setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &yymsp[0].minor.yy0);}
break;
case 48: /* cmd ::= CREATE ACCOUNT ids PASS ids acct_optr */
......
......@@ -246,8 +246,8 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
SDataRow row = SL_GET_NODE_DATA(node);
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
uTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order);
uTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
// all data in mem are checked already.
if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pHandle->order)) ||
......@@ -1273,33 +1273,6 @@ void filterPrepare(void* expr, void* param) {
}
}
int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) {
switch (type) {
case TSDB_DATA_TYPE_INT: DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2));
case TSDB_DATA_TYPE_DOUBLE: DEFAULT_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2));
case TSDB_DATA_TYPE_FLOAT: DEFAULT_COMP(GET_FLOAT_VAL(f1), GET_FLOAT_VAL(f2));
case TSDB_DATA_TYPE_BIGINT: DEFAULT_COMP(GET_INT64_VAL(f1), GET_INT64_VAL(f2));
case TSDB_DATA_TYPE_SMALLINT: DEFAULT_COMP(GET_INT16_VAL(f1), GET_INT16_VAL(f2));
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2));
case TSDB_DATA_TYPE_NCHAR: {
int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE);
if (ret == 0) {
return ret;
}
return (ret < 0) ? -1 : 1;
}
default: {
int32_t ret = strncmp(f1, f2, (size_t)size);
if (ret == 0) {
return ret;
}
return (ret < 0) ? -1 : 1;
}
}
}
typedef struct STableGroupSupporter {
int32_t numOfCols;
SColIndex* pCols;
......@@ -1504,7 +1477,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
const char* tbnameCond, STableGroupInfo *pGroupInfo, SColIndex *pColIndex, int32_t numOfCols) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
uError("failed to get stable, uid:%" PRIu64, uid);
uError("failed to get stable, uid:%, %p" PRIu64, uid);
return TSDB_CODE_INVALID_TABLE_ID;
}
......@@ -1517,7 +1490,12 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
if (ret == TSDB_CODE_SUCCESS) {
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
uTrace("no tbname condition or tagcond, all tables belongs to one group, numOfTables:%d", pGroupInfo->numOfTables);
} else {
// todo add error
}
taosArrayDestroy(res);
return ret;
}
......
......@@ -38,6 +38,8 @@ int patternMatch(const char *zPattern, const char *zString, size_t size, const S
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size);
__compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getComparFunc(int32_t type, int32_t optr);
......
......@@ -333,3 +333,30 @@ __compar_fn_t getKeyComparFunc(int32_t keyType) {
return comparFn;
}
int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) {
switch (type) {
case TSDB_DATA_TYPE_INT: DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2));
case TSDB_DATA_TYPE_DOUBLE: DEFAULT_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2));
case TSDB_DATA_TYPE_FLOAT: DEFAULT_COMP(GET_FLOAT_VAL(f1), GET_FLOAT_VAL(f2));
case TSDB_DATA_TYPE_BIGINT: DEFAULT_COMP(GET_INT64_VAL(f1), GET_INT64_VAL(f2));
case TSDB_DATA_TYPE_SMALLINT: DEFAULT_COMP(GET_INT16_VAL(f1), GET_INT16_VAL(f2));
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2));
case TSDB_DATA_TYPE_NCHAR: {
int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE);
if (ret == 0) {
return ret;
}
return (ret < 0) ? -1 : 1;
}
default: {
int32_t ret = strncmp(f1, f2, (size_t)size);
if (ret == 0) {
return ret;
}
return (ret < 0) ? -1 : 1;
}
}
}
......@@ -56,7 +56,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
qinfo_t pQInfo = NULL;
if (contLen != 0) {
pRet->code = qCreateQueryInfo(pVnode->tsdb, pQueryTableMsg, &pQInfo);
pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册