提交 1971c630 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch 'develop' into add-insert-testcase-to-2.0

......@@ -196,14 +196,14 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo);
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscFreeQueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
......
......@@ -78,7 +78,7 @@ typedef struct STableMetaInfo {
*/
int32_t vgroupIndex;
char name[TSDB_TABLE_ID_LEN]; // (super) table name
SArray* tagColList; // involved tag columns
SArray* tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
/* the structure for sql function in select clause */
......@@ -298,6 +298,7 @@ typedef struct STscObj {
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
void* pMgmtConn;
struct SSqlObj * pSql;
struct SSqlObj * pHb;
struct SSqlObj * sqlList;
......@@ -359,7 +360,7 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next;
} SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret);
int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn);
void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
......@@ -427,9 +428,7 @@ int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column);
extern void * pVnodeConn;
extern void * pTscMgmtConn;
extern void * tscCacheHandle;
extern int slaveIndex;
extern void * tscTmr;
extern void * tscQhandle;
extern int tscKeepConn[];
......
......@@ -46,7 +46,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = 1;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM;
pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......
......@@ -3294,29 +3294,26 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
}
char *arithmetic_callback_function(void *param, char *name, int32_t colId) {
char *getArithColumnData(void *param, const char* name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SArithExprInfo *pExpr = pSupport->pArithExpr;
int32_t colIndex = -1;
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) {
if (colId == pExpr->binExprInfo.pReqColumns[i].colId) {
colIndex = pExpr->binExprInfo.pReqColumns[i].colIndex;
int32_t index = -1;
for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
if (colId == pSupport->colList[i].colId) {
index = i;
break;
}
}
assert(colIndex >= 0 && colId >= 0);
return pSupport->data[colIndex] + pSupport->offset * pSupport->elemSize[colIndex];
assert(index >= 0 && colId >= 0);
return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes;
}
static void arithmetic_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
tSQLBinaryExprCalcTraverse(sas->pArithExpr->binExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size;
pCtx->param[1].pz = NULL;
......@@ -3327,8 +3324,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
sas->offset = index;
tSQLBinaryExprCalcTraverse(sas->pArithExpr->binExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
pCtx->aOutputBuf += pCtx->outputBytes;
}
......
......@@ -209,15 +209,15 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
}
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
SQqueryList *pQList = (SQqueryList *)pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SQueryDesc *pQdesc = pQList->qdesc;
SQqueryList *pQList = (SQqueryList *)pMsg;
pQList->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc*)(pMsg + sizeof(SQqueryList));
// We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQqueryList);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
......@@ -244,8 +244,9 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
}
SStreamList *pSList = (SStreamList *)pMsg;
SStreamDesc *pSdesc = pSList->sdesc;
pSList->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc*) (pMsg + sizeof(SStreamList));
pMsg += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList;
......
......@@ -101,7 +101,7 @@ static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql);
static int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql);
static int32_t getColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t optrToString(tSQLExpr* pExpr, char** exprString);
......@@ -116,7 +116,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql
static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols);
static int32_t exprTreeFromSqlExpr(tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols);
/*
* Used during parsing query sql. Since the query sql usually small in length, error position
......@@ -733,7 +733,7 @@ int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SS
*/
if (size > 0) {
if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
tscClearMeterMetaInfo(pTableMetaInfo, false);
tscClearTableMetaInfo(pTableMetaInfo, false);
}
} else {
assert(pTableMetaInfo->pTableMeta == NULL);
......@@ -1171,13 +1171,32 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE,
sizeof(double), sizeof(double), false);
addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), index.tableIndex);
/* todo alias name should use the original sql string */
char* name = (pItem->aliasName != NULL)? pItem->aliasName:arithmeticExprStr;
strncpy(pExpr->aliasName, name, TSDB_COL_NAME_LEN);
tExprNode* pNode = NULL;
SArray* colList = taosArrayInit(10, sizeof(SColIndex));
int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprsInfo, pQueryInfo, colList);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pNode, NULL);
return invalidSqlErrMsg(pQueryInfo->msg, "invalid arithmetic expression in select clause");
}
SBuffer buf = exprTreeToBinary(pNode);
size_t len = tbufTell(&buf);
char* c = tbufGetData(&buf, true);
// set the serialized binary string as the parameter of arithmetic expression
addExprParams(pExpr, c, TSDB_DATA_TYPE_BINARY, len, index.tableIndex);
insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName, pExpr);
taosArrayDestroy(colList);
tExprTreeDestroy(&pNode, NULL);
} else {
columnList.num = 0;
columnList.ids[0] = (SColumnIndex) {0, 0};
......@@ -1196,8 +1215,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
pFuncExpr->interResBytes = sizeof(double);
pFuncExpr->type = TSDB_DATA_TYPE_DOUBLE;
SExprInfo* pBinExprInfo = &pFuncExpr->binExprInfo;
tExprNode* pNode = NULL;
// SArray* colList = taosArrayInit(10, sizeof(SColIndex));
......@@ -1206,26 +1223,26 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
tExprTreeDestroy(&pNode, NULL);
return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause");
}
pBinExprInfo->pBinExpr = pNode;
pFuncExpr->pExpr = pNode;
assert(0);
// pBinExprInfo->pReqColumns = pColIndex;
// pExprInfo->pReqColumns = pColIndex;
for(int32_t k = 0; k < pBinExprInfo->numOfCols; ++k) {
SColIndex* pCol = &pBinExprInfo->pReqColumns[k];
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t f = 0; f < size; ++f) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, f);
if (strcmp(pExpr->aliasName, pCol->name) == 0) {
pCol->colIndex = f;
break;
}
}
assert(pCol->colIndex >= 0 && pCol->colIndex < size);
tfree(pNode);
}
// for(int32_t k = 0; k < pFuncExpr->numOfCols; ++k) {
// SColIndex* pCol = &pFuncExpr->colList[k];
// size_t size = tscSqlExprNumOfExprs(pQueryInfo);
//
// for(int32_t f = 0; f < size; ++f) {
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, f);
// if (strcmp(pExpr->aliasName, pCol->name) == 0) {
// pCol->colIndex = f;
// break;
// }
// }
//
// assert(pCol->colIndex >= 0 && pCol->colIndex < size);
// tfree(pNode);
// }
}
}
} else {
......@@ -1317,28 +1334,6 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index) {
tscColumnListInsert(pTableMetaInfo->tagColList, index);
// if (pTableMetaInfo->numOfTags == 0 || pTableMetaInfo->tagColumnIndex[pTableMetaInfo->numOfTags - 1] < tagColIndex) {
// pTableMetaInfo->tagColumnIndex[pTableMetaInfo->numOfTags++] = tagColIndex;
// } else { // find the appropriate position
// for (int32_t i = 0; i < pTableMetaInfo->numOfTags; ++i) {
// if (tagColIndex > pTableMetaInfo->tagColumnIndex[i]) {
// continue;
// } else if (tagColIndex == pTableMetaInfo->tagColumnIndex[i]) {
// break;
// } else {
// memmove(&pTableMetaInfo->tagColumnIndex[i + 1], &pTableMetaInfo->tagColumnIndex[i],
// sizeof(pTableMetaInfo->tagColumnIndex[0]) * (pTableMetaInfo->numOfTags - i));
//
// pTableMetaInfo->tagColumnIndex[i] = tagColIndex;
//
// pTableMetaInfo->numOfTags++;
// break;
// }
// }
// }
// plus one means tbname
// assert(tagColIndex >= -1 && tagColIndex < TSDB_MAX_TAGS && pTableMetaInfo->numOfTags <= TSDB_MAX_TAGS + 1);
}
static void addProjectQueryCol(SQueryInfo* pQueryInfo, int32_t startPos, SColumnIndex* pIndex, tSQLExprItem* pItem) {
......@@ -2047,7 +2042,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIn
return TSDB_CODE_SUCCESS;
}
int32_t getColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
if (pQueryInfo->pTableMetaInfo == NULL || pQueryInfo->numOfTables == 0) {
return TSDB_CODE_INVALID_SQL;
}
......@@ -2477,6 +2472,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
pQueryInfo->groupbyExpr.numOfGroupCols = pList->nExpr;
if (pList->nExpr > TSDB_MAX_TAGS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
......@@ -3545,7 +3544,7 @@ static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* ac
return TSDB_CODE_SUCCESS;
}
SStringBuilder sb1;
SStringBuilder sb1 = { 0 };
taosStringBuilderAppendStringLen(&sb1, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN);
char db[TSDB_TABLE_ID_LEN] = {0};
......@@ -3749,13 +3748,24 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
tSQLExpr* p1 = extractExprForSTable(pExpr, pQueryInfo, i);
tExprNode* p = NULL;
ret = exprTreeFromSqlExpr(&p, p1, NULL, pQueryInfo, NULL);
SArray* colList = taosArrayInit(10, sizeof(SColIndex));
ret = exprTreeFromSqlExpr(&p, p1, NULL, pQueryInfo, colList);
SBuffer buf = exprTreeToBinary(p);
int64_t uid = tscGetMetaInfo(pQueryInfo, i)->pTableMeta->uid;
// add to source column list
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
int64_t uid = pTableMetaInfo->pTableMeta->uid;
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
size_t num = taosArrayGetSize(colList);
for(int32_t j = 0; j < num; ++j) {
SColIndex* pIndex = taosArrayGet(colList, j);
SColumnIndex index = {.tableIndex = i, .columnIndex = pIndex->colIndex - numOfCols};
addRequiredTagColumn(pTableMetaInfo, &index);
}
tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &buf);
doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1);
......@@ -4915,7 +4925,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
list.num = 1;
list.ids[0] = colIndex;
insertResultField(pQueryInfo, size - 1, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr);
insertResultField(pQueryInfo, size, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size - 1);
pInfo->visible = false;
}
......@@ -5867,7 +5877,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return TSDB_CODE_SUCCESS; // Does not build query message here
}
int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols) {
int32_t exprTreeFromSqlExpr(tExprNode **pExpr, const tSQLExpr* pSqlExpr, SArray* pExprInfo, SQueryInfo* pQueryInfo, SArray* pCols) {
tExprNode* pLeft = NULL;
tExprNode* pRight= NULL;
......@@ -5892,8 +5902,9 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
(*pExpr)->pVal = calloc(1, sizeof(tVariant));
tVariantAssign((*pExpr)->pVal, &pSqlExpr->val);
return TSDB_CODE_SUCCESS;
} else if (pSqlExpr->nSQLOptr >= TK_COUNT && pSqlExpr->nSQLOptr <= TK_AVG_IRATE) {
// arithmetic expression on the results of aggregation functions
*pExpr = calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TSQL_NODE_COL;
(*pExpr)->pSchema = calloc(1, sizeof(SSchema));
......@@ -5911,7 +5922,7 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
break;
}
}
} else if (pSqlExpr->nSQLOptr == TK_ID) { // column name
} else if (pSqlExpr->nSQLOptr == TK_ID) { // column name, normal column arithmetic expression
SColumnIndex index = {0};
int32_t ret = getColumnIndexByName(&pSqlExpr->colInfo, pQueryInfo, &index);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -5925,18 +5936,21 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
*(*pExpr)->pSchema = *pSchema;
if (pCols != NULL) { // record the involved columns
SColIndex colIndex = {0};
strncpy(colIndex.name, pSchema->name, TSDB_COL_NAME_LEN);
colIndex.colId = pSchema->colId;
colIndex.colIndex = index.columnIndex;
taosArrayPush(pCols, &colIndex);
}
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_INVALID_SQL;
}
if (pCols != NULL) { // record the involved columns
SColIndex colIndex = {0};
strncpy(colIndex.name, pSqlExpr->operand.z, pSqlExpr->operand.n);
taosArrayPush(pCols, &colIndex);
}
} else {
*pExpr = (tExprNode *)calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TSQL_NODE_EXPR;
......
......@@ -190,6 +190,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
}
int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen);
......@@ -223,7 +224,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql,
.code = 0
};
rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg);
rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg);
}
return TSDB_CODE_SUCCESS;
......@@ -696,7 +697,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
pQueryMsg->queryType = htons(pQueryInfo->type);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
......@@ -757,16 +758,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
bool hasArithmeticFunction = false;
SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_ARITHM) {
hasArithmeticFunction = true;
}
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId)) {
/* column id is not valid according to the cached table meta, the table meta is expired */
tscError("%p table schema is not matched with parsed sql", pSql);
......@@ -787,9 +782,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
// by plus one char to make the string null-terminated
pMsg += pExpr->param[j].nLen + 1;
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
}
......@@ -798,23 +791,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
}
int32_t len = 0;
if (hasArithmeticFunction) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pColBase = taosArrayGetP(pQueryInfo->colList, i);
char * name = pSchema[pColBase[i].colIndex.columnIndex].name;
int32_t lenx = strlen(name);
memcpy(pMsg, name, lenx);
*(pMsg + lenx) = ',';
len += (lenx + 1); // one for comma
pMsg += (lenx + 1);
}
}
pQueryMsg->colNameLen = htonl(len);
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
......@@ -915,6 +891,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
*pMsg = 0;
pMsg++;
} else {
strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
}
// tbname in/like query expression should be sent to mgmt node
msgLen = pMsg - pStart;
......@@ -1847,7 +1831,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
assert(msgLen + minMsgSize() <= size);
return msgLen;
return TSDB_CODE_SUCCESS;
}
int tscProcessTableMetaRsp(SSqlObj *pSql) {
......@@ -2599,7 +2583,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
// if (pSql->fp != NULL && pSql->pStream == NULL) {
// pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// tscFreeSubqueryInfo(pCmd);
// tscFreeQueryInfo(pCmd);
// }
tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew);
......
......@@ -66,7 +66,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL;
}
if (tscInitRpc(user, pass) != 0) {
void* pMgmtConn = NULL;
if (tscInitRpc(user, pass, &pMgmtConn) != 0) {
terrno = TSDB_CODE_NETWORK_UNAVAIL;
return NULL;
}
......@@ -118,6 +119,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
strtolower(pObj->db, tmp);
}
pObj->pMgmtConn = pMgmtConn;
pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
......@@ -456,20 +458,21 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
}
}
static char *getArithemicInputSrc(void *param, char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SArithExprInfo * pExpr = pSupport->pArithExpr;
int32_t index = -1;
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->binExprInfo.pReqColumns[i].name) == 0) {
index = i;
break;
}
}
assert(index >= 0 && index < pExpr->binExprInfo.numOfCols);
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
// SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
// SArithExprInfo * pExpr = pSupport->pArithExpr;
// int32_t index = -1;
// for (int32_t i = 0; i < pExpr->numOfCols; ++i) {
// if (strcmp(name, pExpr->colList[i].name) == 0) {
// index = i;
// break;
// }
// }
//
// assert(index >= 0 && index < pExpr->numOfCols);
// return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
return 0;
}
static void **doSetResultRowData(SSqlObj *pSql) {
......@@ -519,21 +522,21 @@ static void **doSetResultRowData(SSqlObj *pSql) {
sas->offset = 0;
sas->pArithExpr = pInfo->pArithExprInfo;
sas->numOfCols = sas->pArithExpr->binExprInfo.numOfCols;
// sas->numOfCols = sas->pArithExpr->numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->bytes);
}
for(int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pArithExpr->binExprInfo.pReqColumns[k].colIndex;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
// int32_t columnIndex = sas->pArithExpr->colList[k].colIndex;
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
//
// sas->elemSize[k] = pExpr->resBytes;
// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tSQLBinaryExprCalcTraverse(sas->pArithExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc);
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization
......@@ -632,8 +635,6 @@ static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if (success) { // current row of final output has been built, return to app
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t tableIndex = pRes->pColumnIndex[i].tableIndex;
int32_t columnIndex = pRes->pColumnIndex[i].columnIndex;
......
......@@ -147,7 +147,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
tscClearMeterMetaInfo(pTableMetaInfo, true);
tscClearTableMetaInfo(pTableMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return;
......@@ -177,7 +177,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscClearMeterMetaInfo(pTableMetaInfo, true);
tscClearTableMetaInfo(pTableMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
return;
......@@ -259,7 +259,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->numOfRes);
// release the metric/meter meta information reference, so data in cache can be updated
tscClearMeterMetaInfo(pTableMetaInfo, false);
tscClearTableMetaInfo(pTableMetaInfo, false);
tscSetNextLaunchTimer(pStream, pSql);
}
}
......
......@@ -214,8 +214,8 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pBase = taosArrayGet(pQueryInfo->colList, i);
if (pBase->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
SColumn* base = taosArrayGet(pQueryInfo->colList, i);
if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return true;
}
}
......
......@@ -17,8 +17,6 @@
#include "taosmsg.h"
#include "tcache.h"
#include "trpc.h"
#include "taosdef.h"
#include "tsocket.h"
#include "tsystem.h"
#include "ttime.h"
#include "ttimer.h"
......@@ -33,11 +31,7 @@
// global, not configurable
void * pVnodeConn;
void * pVMeterConn;
void * pTscMgmtConn;
void * pSlaveConn;
void * tscCacheHandle;
int slaveIndex;
void * tscTmr;
void * tscQhandle;
void * tscCheckDiskUsageTmr;
......@@ -49,12 +43,12 @@ static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
void tscCheckDiskUsage(void *para, void *unused) {
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosGetDisk();
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
int32_t tscInitRpc(const char *user, const char *secret) {
int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) {
SRpcInit rpcInit;
char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
......@@ -80,7 +74,7 @@ int32_t tscInitRpc(const char *user, const char *secret) {
}
}
if (pTscMgmtConn == NULL) {
if (*pMgmtConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
......@@ -96,8 +90,8 @@ int32_t tscInitRpc(const char *user, const char *secret) {
rpcInit.spi = 1;
rpcInit.secret = secretEncrypt;
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
*pMgmtConn = rpcOpen(&rpcInit);
if (*pMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
return -1;
}
......@@ -159,7 +153,6 @@ void taos_init_imp() {
}
tscInitMsgsFp();
slaveIndex = rand();
int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections;
if (tscEmbedded == 0) {
......@@ -211,11 +204,6 @@ void taos_cleanup() {
pVnodeConn = NULL;
}
if (pTscMgmtConn != NULL) {
rpcClose(pTscMgmtConn);
pTscMgmtConn = NULL;
}
taosTmrCleanUp(tscTmr);
}
......@@ -387,7 +375,6 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
return 0;
}
int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0;
......
......@@ -337,7 +337,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
pCmd->pTableList= NULL;
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeSubqueryInfo(pCmd);
tscFreeQueryInfo(pCmd);
}
/*
......@@ -761,6 +761,8 @@ void tscCloseTscObj(STscObj* pObj) {
tscFreeSqlObj(pSql);
sem_destroy(&pSql->rspSem);
rpcClose(pObj->pMgmtConn);
pthread_mutex_destroy(&pObj->mutex);
tscTrace("%p DB connection is closed", pObj);
......@@ -937,8 +939,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
SFieldSupInfo* pInfo = taosArrayGet(pFieldInfo->pSupportInfo, i);
if (pInfo->pArithExprInfo != NULL) {
tExprTreeDestroy(&pInfo->pArithExprInfo->binExprInfo.pBinExpr, NULL);
tfree(pInfo->pArithExprInfo->binExprInfo.pReqColumns);
tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
}
}
......@@ -1459,7 +1460,7 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
void tscCleanSqlCmd(SSqlCmd* pCmd) {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeSubqueryInfo(pCmd);
tscFreeQueryInfo(pCmd);
uint32_t allocSize = pCmd->allocSize;
char* allocPtr = pCmd->payload;
......@@ -1601,7 +1602,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
return TSDB_CODE_SUCCESS;
}
static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
tscTagCondRelease(&pQueryInfo->tagCond);
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
......@@ -1611,6 +1612,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
tscColumnListDestroy(pQueryInfo->colList);
memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList));
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
pQueryInfo->groupbyExpr.columnInfo = NULL;
}
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
tfree(pQueryInfo->defaultVal);
......@@ -1619,11 +1625,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doClearSubqueryInfo(pQueryInfo);
freeQueryInfoImpl(pQueryInfo);
}
}
void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
void tscFreeQueryInfo(SSqlCmd* pCmd) {
if (pCmd == NULL || pCmd->numOfClause == 0) {
return;
}
......@@ -1632,7 +1638,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doClearSubqueryInfo(pQueryInfo);
freeQueryInfoImpl(pQueryInfo);
tscClearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
tfree(pQueryInfo);
}
......@@ -1671,7 +1677,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
}
if (pTagCols == NULL) {
pTableMetaInfo->tagColList = taosArrayInit(4, sizeof(SColumnIndex));
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
} else {
pTableMetaInfo->tagColList = taosArrayClone(pTagCols);
}
......@@ -1691,7 +1697,7 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index);
tscClearMeterMetaInfo(pTableMetaInfo, removeFromCache);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo);
int32_t after = pQueryInfo->numOfTables - index - 1;
......@@ -1713,13 +1719,18 @@ void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool
tfree(pQueryInfo->pTableMetaInfo);
}
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
if (pTableMetaInfo == NULL) {
return;
}
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
tfree(pTableMetaInfo->vgroupList);
if (pTableMetaInfo->tagColList != NULL) {
taosArrayDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL;
}
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
......
......@@ -32,6 +32,9 @@ extern "C" {
#define TSKEY int64_t
#endif
// this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
// Bytes for each type.
extern const int32_t TYPE_BYTES[11];
// TODO: replace and remove code below
......@@ -141,16 +144,17 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_RELATION_GREATER_EQUAL 5
#define TSDB_RELATION_NOT_EQUAL 6
#define TSDB_RELATION_LIKE 7
#define TSDB_RELATION_IN 8
#define TSDB_RELATION_AND 8
#define TSDB_RELATION_OR 9
#define TSDB_RELATION_NOT 10
#define TSDB_RELATION_AND 9
#define TSDB_RELATION_OR 10
#define TSDB_RELATION_NOT 11
#define TSDB_BINARY_OP_ADD 11
#define TSDB_BINARY_OP_SUBTRACT 12
#define TSDB_BINARY_OP_MULTIPLY 13
#define TSDB_BINARY_OP_DIVIDE 14
#define TSDB_BINARY_OP_REMAINDER 15
#define TSDB_BINARY_OP_ADD 12
#define TSDB_BINARY_OP_SUBTRACT 13
#define TSDB_BINARY_OP_MULTIPLY 14
#define TSDB_BINARY_OP_DIVIDE 15
#define TSDB_BINARY_OP_REMAINDER 16
#define TSDB_USERID_LEN 9
#define TS_PATH_DELIMITER_LEN 1
......
......@@ -356,17 +356,9 @@ typedef struct {
} SMDDropVnodeMsg;
typedef struct SColIndex {
int16_t colId;
/*
* colIdx is the index of column in latest schema of table
* it is available in the client side. Also used to determine
* whether current table schema is up-to-date.
*
* colIdxInBuf is used to denote the index of column in pQuery->colList,
* this value is invalid in client side, as well as in cache block of vnode either.
*/
int16_t colIndex;
uint16_t flag; // denote if it is a tag or not
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN];
} SColIndex;
......@@ -388,15 +380,9 @@ typedef struct SSqlFuncMsg {
} arg[3];
} SSqlFuncMsg;
typedef struct SExprInfo {
struct tExprNode *pBinExpr; /* for binary expression */
int32_t numOfCols; /* binary expression involves the readed number of columns*/
SColIndex * pReqColumns; /* source column list */
} SExprInfo;
typedef struct SArithExprInfo {
SSqlFuncMsg pBase;
SExprInfo binExprInfo;
SSqlFuncMsg base;
struct tExprNode* pExpr;
int16_t bytes;
int16_t type;
int16_t interResBytes;
......@@ -470,11 +456,12 @@ typedef struct {
int64_t offset;
uint16_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers
int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list
int32_t colNameLen;
int64_t colNameList;
// int32_t colNameLen;
// int64_t colNameList;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
......@@ -775,14 +762,14 @@ typedef struct {
} SMDCfgDnodeMsg, SCMCfgDnodeMsg;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN + 1];
char sql[TSDB_SHOW_SQL_LEN];
uint32_t queryId;
int64_t useconds;
int64_t stime;
} SQueryDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN + 1];
char sql[TSDB_SHOW_SQL_LEN];
uint32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
......@@ -794,12 +781,10 @@ typedef struct {
typedef struct {
int32_t numOfQueries;
SQueryDesc *qdesc;
} SQqueryList;
typedef struct {
int32_t numOfStreams;
SStreamDesc *sdesc;
} SStreamList;
typedef struct {
......
......@@ -279,8 +279,17 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
* @param pTagCond. tag query condition
*
*/
int32_t tsdbQueryByTagsCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, size_t len, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols);
int32_t tsdbQueryByTagsCond(
TsdbRepoT *tsdb,
int64_t uid,
const char *pTagCond,
size_t len,
int16_t tagNameRelType,
const char* tbnameCond,
STableGroupInfo *pGroupList,
SColIndex *pColIndex,
int32_t numOfCols
);
int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, int64_t uid, STableGroupInfo *pGroupInfo);
......
......@@ -82,16 +82,18 @@ void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param);
void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, char *, int32_t));
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids);
// todo refactor: remove it
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res);
uint8_t getBinaryExprOptr(SSQLToken *pToken);
SBuffer exprTreeToBinary(tExprNode* pExprTree);
int32_t exprTreeFromBinary(const void* pBuf, size_t size, tExprNode** pExprNode);
tExprNode* exprTreeFromBinary(const void* pBuf, size_t size);
tExprNode* exprTreeFromTableName(const char* tbnameCond);
#ifdef __cplusplus
}
......
......@@ -115,10 +115,10 @@ enum {
typedef struct SArithmeticSupport {
SArithExprInfo *pArithExpr;
int32_t elemSize[TSDB_MAX_COLUMNS];
int32_t numOfCols;
SColumnInfo* colList;
int32_t offset;
char * data[TSDB_MAX_COLUMNS];
char** data;
} SArithmeticSupport;
typedef struct SQLPreAggVal {
......
......@@ -17,6 +17,7 @@
#define TDENGINE_TVARIANT_H
#include "tstoken.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
......@@ -31,6 +32,7 @@ typedef struct tVariant {
double dKey;
char * pz;
wchar_t *wpz;
SArray *arr; // only for 'in' query to hold value list, not value for a field
};
} tVariant;
......@@ -38,7 +40,7 @@ void tVariantCreate(tVariant *pVar, SSQLToken *token);
void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t type);
void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t type);
void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type);
void tVariantDestroy(tVariant *pV);
......
......@@ -30,6 +30,7 @@
#include "tarray.h"
#include "tskiplist.h"
#include "queryLog.h"
#include "tsdbMain.h"
/*
*
......@@ -521,38 +522,48 @@ static int32_t setQueryCond(tQueryInfo *queryColInfo, SQueryCond* pCond) {
if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL ||
optr == TSDB_RELATION_EQUAL || optr == TSDB_RELATION_NOT_EQUAL) {
pCond->start = calloc(1, sizeof(tVariant));
tVariantAssign(&pCond->start->v, &queryColInfo->q);
pCond->start->optr = queryColInfo->optr;
} else if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
pCond->end = calloc(1, sizeof(tVariant));
tVariantAssign(&pCond->end->v, &queryColInfo->q);
pCond->end->optr = queryColInfo->optr;
} else if (optr == TSDB_RELATION_IN) {
printf("relation is in\n");
} else if (optr == TSDB_RELATION_LIKE) {
printf("relation is like\n");
}
return TSDB_CODE_SUCCESS;
}
static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t type, SArray* result) {
static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
SSkipListIterator* iter = NULL;
if (pCond->start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &pCond->start->v.i64Key, type, TSDB_ORDER_ASC);
int32_t type = pQueryInfo->q.nType;
SQueryCond cond = { 0 };
setQueryCond(pQueryInfo, &cond);
if (cond.start != NULL) {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &cond.start->v.i64Key, type, TSDB_ORDER_ASC);
} else {
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &pCond->end->v.i64Key, type, TSDB_ORDER_DESC);
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &cond.end->v.i64Key, type, TSDB_ORDER_DESC);
}
__compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type, 0);
if (pCond->start != NULL) {
int32_t optr = pCond->start->optr;
if (cond.start != NULL) {
int32_t optr = cond.start->optr;
if (optr == TSDB_RELATION_EQUAL) {
while(tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
int32_t ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key);
int32_t ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &cond.start->v.i64Key);
if (ret == 0) {
taosArrayPush(result, SL_GET_NODE_DATA(pNode));
} else {
......@@ -567,7 +578,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key);
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &cond.start->v.i64Key);
assert(ret >= 0);
}
......@@ -584,7 +595,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
assert(0);
}
} else {
int32_t optr = pCond->end->optr;
int32_t optr = cond.end->optr;
if (optr == TSDB_RELATION_LESS || optr == TSDB_RELATION_LESS_EQUAL) {
bool comp = true;
......@@ -594,7 +605,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->end->v.i64Key);
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &cond.end->v.i64Key);
assert(ret <= 0);
}
......@@ -609,17 +620,6 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
}
}
/*
* qsort comparator
* sort the result to ensure meters with the same gid is grouped together
*/
//static int32_t compareByAddr(const void *pLeft, const void *pRight) {
// int64_t p1 = (int64_t) * ((tSkipListNode **)pLeft);
// int64_t p2 = (int64_t) * ((tSkipListNode **)pRight);
//
// DEFAULT_COMP(p1, p2);
//}
int32_t merge(SArray *pLeft, SArray *pRight, SArray *pFinalRes) {
// assert(pFinalRes->pRes == 0);
//
......@@ -770,19 +770,55 @@ static void exprTreeTraverseImpl(tExprNode *pExpr, SArray *pResult, SBinaryFilte
taosArrayCopy(pResult, array);
}
static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSkipList *pSkipList,
SBinaryFilterSupp *param) {
static void tSQLBinaryTraverseOnSkipList(
tExprNode *pExpr,
SArray *pResult,
SSkipList *pSkipList,
SBinaryFilterSupp *param
) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
SSkipListNode *pNode = tSkipListIterGet(iter);
if (filterItem(pExpr, pNode, param)) {
taosArrayPush(pResult, SL_GET_NODE_DATA(pNode));
}
}
tSkipListDestroyIter(iter);
}
static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) {
bool addToResult = false;
SSkipListNode *pNode = tSkipListIterGet(iter);
STable* table = *(STable**) SL_GET_NODE_DATA(pNode);
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(table->name, pQueryInfo->q.arr);
} else if(pQueryInfo->optr == TSDB_RELATION_LIKE) {
addToResult = !pQueryInfo->compare(table->name, pQueryInfo->q.pz);
}
} else {
// TODO: other columns
}
if (addToResult) {
taosArrayPush(result, (void*)&table);
}
}
tSkipListDestroyIter(iter);
}
// post-root order traverse syntax tree
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param) {
if (pExpr == NULL) {
......@@ -792,97 +828,101 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
// recursive traverse left child branch
if (pLeft->nodeType == TSQL_NODE_EXPR || pRight->nodeType == TSQL_NODE_EXPR) {
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
// column project
if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
if (weight == 0 && taosArrayGetSize(result) > 0 && pSkipList == NULL) {
/**
* Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
* Since no index presented, the filter operation is done by scan all elements in the result set.
*
* if the query is a high selectivity filter, only small portion of meters are retrieved.
*/
param->setupInfoFn(pExpr, param->pExtInfo);
if (pSkipList == NULL) {
tSQLListTraverseOnResult(pExpr, param->fp, result);
return;
}
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) {
tQueryIndexColumn(pSkipList, pQueryInfo, result);
} else {
tQueryIndexlessColumn(pSkipList, pQueryInfo, result);
}
return;
}
// recursive traverse left child branch
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
if (weight == 0 ) {
if (taosArrayGetSize(result) > 0 && pSkipList == NULL) {
/**
* Perform the filter operation based on the initial filter result, which is obtained from filtering from index.
* Since no index presented, the filter operation is done by scan all elements in the result set.
*
* if the query is a high selectivity filter, only small portion of meters are retrieved.
*/
exprTreeTraverseImpl(pExpr, result, param);
} else if (weight == 0) {
} else {
/**
* apply the hierarchical expression to every node in skiplist for find the qualified nodes
*/
assert(taosArrayGetSize(result) == 0);
tSQLBinaryTraverseOnSkipList(pExpr, result, pSkipList, param);
} else if (weight == 2 || (weight == 1 && pExpr->_node.optr == TSDB_RELATION_OR)) {
SArray* rLeft = taosArrayInit(10, POINTER_BYTES);
SArray* rRight = taosArrayInit(10, POINTER_BYTES);
tExprTreeTraverse(pLeft, pSkipList, rLeft, param);
tExprTreeTraverse(pRight, pSkipList, rRight, param);
if (pExpr->_node.optr == TSDB_RELATION_AND) { // CROSS
intersect(rLeft, rRight, result);
} else if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
merge(rLeft, rRight, result);
} else {
assert(false);
}
}
return;
}
if (weight == 2 || (weight == 1 && pExpr->_node.optr == TSDB_RELATION_OR)) {
SArray* rLeft = taosArrayInit(10, POINTER_BYTES);
SArray* rRight = taosArrayInit(10, POINTER_BYTES);
tExprTreeTraverse(pLeft, pSkipList, rLeft, param);
tExprTreeTraverse(pRight, pSkipList, rRight, param);
taosArrayDestroy(rLeft);
taosArrayDestroy(rRight);
if (pExpr->_node.optr == TSDB_RELATION_AND) { // CROSS
intersect(rLeft, rRight, result);
} else if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
merge(rLeft, rRight, result);
} else {
/*
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
*
* first, we filter results based on the skiplist index, which is the initial filter stage,
* then, we conduct the secondary filter operation based on the result from the initial filter stage.
*/
assert(pExpr->_node.optr == TSDB_RELATION_AND);
tExprNode *pFirst = NULL;
tExprNode *pSecond = NULL;
if (pLeft->_node.hasPK == 1) {
pFirst = pLeft;
pSecond = pRight;
} else {
pFirst = pRight;
pSecond = pLeft;
}
assert(false);
}
assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
taosArrayDestroy(rLeft);
taosArrayDestroy(rRight);
return;
}
// we filter the result based on the skiplist index in the first place
tExprTreeTraverse(pFirst, pSkipList, result, param);
/*
* (weight == 1 && pExpr->nSQLBinaryOptr == TSDB_RELATION_AND) is handled here
*
* first, we filter results based on the skiplist index, which is the initial filter stage,
* then, we conduct the secondary filter operation based on the result from the initial filter stage.
*/
assert(pExpr->_node.optr == TSDB_RELATION_AND);
tExprNode *pFirst = NULL;
tExprNode *pSecond = NULL;
if (pLeft->_node.hasPK == 1) {
pFirst = pLeft;
pSecond = pRight;
} else {
pFirst = pRight;
pSecond = pLeft;
}
/*
* recursively perform the filter operation based on the initial results,
* So, we do not set the skip list index as a parameter
*/
tExprTreeTraverse(pSecond, NULL, result, param);
}
} else { // column project
assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
assert(pFirst != pSecond && pFirst != NULL && pSecond != NULL);
param->setupInfoFn(pExpr, param->pExtInfo);
if (pSkipList == NULL) {
tSQLListTraverseOnResult(pExpr, param->fp, result);
} else {
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) {
SQueryCond cond = {0};
/*int32_t ret = */ setQueryCond(pQueryInfo, &cond);
tQueryOnSkipList(pSkipList, &cond, pQueryInfo->q.nType, result);
} else {
/* Brutal force scan the whole skip list to find the appropriate result,
* since the filter is not applied to indexed column.
*/
assert(0);
// result->num = tSkipListIterateList(pSkipList, (tSkipListNode ***)&result->pRes, fp, queryColInfo);
}
}
}
// we filter the result based on the skiplist index in the first place
tExprTreeTraverse(pFirst, pSkipList, result, param);
/*
* recursively perform the filter operation based on the initial results,
* So, we do not set the skip list index as a parameter
*/
tExprTreeTraverse(pSecond, NULL, result, param);
}
void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*getSourceDataBlock)(void *, char *, int32_t)) {
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*getSourceDataBlock)(void *, const char*, int32_t)) {
if (pExprs == NULL) {
return;
}
......@@ -893,13 +933,13 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut
/* the left output has result from the left child syntax tree */
char *pLeftOutput = (char*)malloc(sizeof(int64_t) * numOfRows);
if (pLeft->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprCalcTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
tExprTreeCalcTraverse(pLeft, numOfRows, pLeftOutput, param, order, getSourceDataBlock);
}
/* the right output has result from the right child syntax tree */
char *pRightOutput = malloc(sizeof(int64_t) * numOfRows);
if (pRight->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
tExprTreeCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
}
if (pLeft->nodeType == TSQL_NODE_EXPR) {
......@@ -961,7 +1001,7 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut
free(pRightOutput);
}
void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids) {
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) {
if (pExprs == NULL) {
return;
}
......@@ -971,17 +1011,15 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids) {
// recursive traverse left child branch
if (pLeft->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprTrv(pLeft, val, ids);
tSQLBinaryExprTrv(pLeft, res);
} else if (pLeft->nodeType == TSQL_NODE_COL) {
ids[*val] = pLeft->pSchema->colId;
(*val) += 1;
taosArrayPush(res, &pLeft->pSchema->colId);
}
if (pRight->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprTrv(pRight, val, ids);
tSQLBinaryExprTrv(pRight, res);
} else if (pRight->nodeType == TSQL_NODE_COL) {
ids[*val] = pRight->pSchema->colId;
(*val) += 1;
taosArrayPush(res, &pRight->pSchema->colId);
}
}
......@@ -1032,49 +1070,143 @@ SBuffer exprTreeToBinary(tExprNode* pExprTree) {
return buf;
}
static void exprTreeFromBinaryImpl(tExprNode** pExprTree, SBuffer* pBuf) {
static tExprNode* exprTreeFromBinaryImpl(SBuffer* pBuf) {
tExprNode* pExpr = calloc(1, sizeof(tExprNode));
tbufReadToBuffer(pBuf, &pExpr->nodeType, sizeof(pExpr->nodeType));
pExpr->nodeType = tbufReadUint8(pBuf);
if (pExpr->nodeType == TSQL_NODE_VALUE) {
tVariant* pVal = calloc(1, sizeof(tVariant));
if (pVal == NULL) {
// TODO:
}
pExpr->pVal = pVal;
tbufReadToBuffer(pBuf, &pVal->nType, sizeof(pVal->nType));
pVal->nType = tbufReadUint32(pBuf);
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
tbufReadToBuffer(pBuf, &pVal->nLen, sizeof(pVal->nLen));
pVal->pz = calloc(1, pVal->nLen + 1);
tbufReadToBuffer(pBuf, pVal->pz, pVal->nLen);
} else {
tbufReadToBuffer(pBuf, &pVal->pz, sizeof(pVal->i64Key));
pVal->i64Key = tbufReadInt64(pBuf);
}
pExpr->pVal = pVal;
} else if (pExpr->nodeType == TSQL_NODE_COL) {
SSchema* pSchema = calloc(1, sizeof(SSchema));
tbufReadToBuffer(pBuf, &pSchema->colId, sizeof(pSchema->colId));
tbufReadToBuffer(pBuf, &pSchema->bytes, sizeof(pSchema->bytes));
tbufReadToBuffer(pBuf, &pSchema->type, sizeof(pSchema->type));
if (pSchema == NULL) {
// TODO:
}
pExpr->pSchema = pSchema;
pSchema->colId = tbufReadInt16(pBuf);
pSchema->bytes = tbufReadInt16(pBuf);
pSchema->type = tbufReadUint8(pBuf);
tbufReadToString(pBuf, pSchema->name, TSDB_COL_NAME_LEN);
pExpr->pSchema = pSchema;
} else if (pExpr->nodeType == TSQL_NODE_EXPR) {
tbufReadToBuffer(pBuf, &pExpr->_node.optr, sizeof(pExpr->_node.optr));
tbufReadToBuffer(pBuf, &pExpr->_node.hasPK, sizeof(pExpr->_node.hasPK));
exprTreeFromBinaryImpl(&pExpr->_node.pLeft, pBuf);
exprTreeFromBinaryImpl(&pExpr->_node.pRight, pBuf);
pExpr->_node.optr = tbufReadUint8(pBuf);
pExpr->_node.hasPK = tbufReadUint8(pBuf);
pExpr->_node.pLeft = exprTreeFromBinaryImpl(pBuf);
pExpr->_node.pRight = exprTreeFromBinaryImpl(pBuf);
assert(pExpr->_node.pLeft != NULL && pExpr->_node.pRight != NULL);
}
*pExprTree = pExpr;
return pExpr;
}
int32_t exprTreeFromBinary(const void* pBuf, size_t size, tExprNode** pExprNode) {
tExprNode* exprTreeFromBinary(const void* pBuf, size_t size) {
if (size == 0) {
return NULL;
}
SBuffer rbuf = {0};
/*int32_t code =*/ tbufBeginRead(&rbuf, pBuf, size);
exprTreeFromBinaryImpl(pExprNode, &rbuf);
return TSDB_CODE_SUCCESS;
tbufBeginRead(&rbuf, pBuf, size);
return exprTreeFromBinaryImpl(&rbuf);
}
tExprNode* exprTreeFromTableName(const char* tbnameCond) {
if (!tbnameCond) {
return NULL;
}
tExprNode* expr = calloc(1, sizeof(tExprNode));
if (expr == NULL) {
// TODO:
}
expr->nodeType = TSQL_NODE_EXPR;
tExprNode* left = calloc(1, sizeof(tExprNode));
if (left == NULL) {
// TODO:
}
expr->_node.pLeft = left;
left->nodeType = TSQL_NODE_COL;
SSchema* pSchema = calloc(1, sizeof(SSchema));
if (pSchema == NULL) {
// TODO:
}
left->pSchema = pSchema;
pSchema->type = TSDB_DATA_TYPE_BINARY;
pSchema->bytes = TSDB_TABLE_NAME_LEN;
strcpy(pSchema->name, TSQL_TBNAME_L);
pSchema->colId = -1;
tExprNode* right = calloc(1, sizeof(tExprNode));
if (right == NULL) {
// TODO
}
expr->_node.pRight = right;
if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN) == 0) {
right->nodeType = TSQL_NODE_VALUE;
expr->_node.optr = TSDB_RELATION_LIKE;
tVariant* pVal = calloc(1, sizeof(tVariant));
if (pVal == NULL) {
// TODO:
}
right->pVal = pVal;
pVal->nType = TSDB_DATA_TYPE_BINARY;
size_t len = strlen(tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN) + 1;
pVal->pz = malloc(len);
if (pVal->pz == NULL) {
// TODO:
}
memcpy(pVal->pz, tbnameCond + QUERY_COND_REL_PREFIX_LIKE_LEN, len);
pVal->nLen = (int32_t)len;
} else if (strncmp(tbnameCond, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN) == 0) {
right->nodeType = TSQL_NODE_VALUE;
expr->_node.optr = TSDB_RELATION_IN;
tVariant* pVal = calloc(1, sizeof(tVariant));
if (pVal == NULL) {
// TODO:
}
right->pVal = pVal;
pVal->nType = TSDB_DATA_TYPE_ARRAY;
pVal->arr = taosArrayInit(2, sizeof(char*));
const char* cond = tbnameCond + QUERY_COND_REL_PREFIX_IN_LEN;
for (const char *e = cond; *e != 0; e++) {
if (*e == TS_PATH_DELIMITER[0]) {
cond = e + 1;
} else if (*e == ',') {
size_t len = e - cond + 1;
char* p = malloc( len );
memcpy(p, cond, len);
p[len - 1] = 0;
cond += len;
taosArrayPush(pVal->arr, &p);
}
}
if (*cond != 0) {
char* p = strdup( cond );
taosArrayPush(pVal->arr, &p);
}
taosArraySortString(pVal->arr);
}
return expr;
}
\ No newline at end of file
......@@ -53,8 +53,8 @@
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].bytes)
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].type)
((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type)
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
......@@ -257,7 +257,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
......@@ -334,7 +334,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
int32_t numOfSelectivity = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functId = pQuery->pSelectExpr[i].base.functionId;
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
hasTags = true;
continue;
......@@ -352,7 +352,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
return false;
}
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; }
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool limitResults(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -370,7 +370,7 @@ static bool limitResults(SQInfo *pQInfo) {
static bool isTopBottomQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS) {
continue;
}
......@@ -385,7 +385,7 @@ static bool isTopBottomQuery(SQuery *pQuery) {
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, int32_t index) {
// for a tag column, no corresponding field info
SColIndex *pColIndexEx = &pQuery->pSelectExpr[index].pBase.colInfo;
SColIndex *pColIndexEx = &pQuery->pSelectExpr[index].base.colInfo;
if (TSDB_COL_IS_TAG(pColIndexEx->flag)) {
return NULL;
}
......@@ -413,7 +413,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo
*/
static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis,
SDataStatis **pColStatis) {
SColIndex *pColIndex = &pQuery->pSelectExpr[col].pBase.colInfo;
SColIndex *pColIndex = &pQuery->pSelectExpr[col].base.colInfo;
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
return false;
}
......@@ -717,7 +717,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
......@@ -743,7 +743,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
......@@ -817,13 +817,12 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size,
SArray *pDataBlock) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
char *dataBlock = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
int32_t functionId = pQuery->pSelectExpr[col].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
if (functionId == TSDB_FUNC_ARITHM) {
sas->pArithExpr = &pQuery->pSelectExpr[col];
......@@ -833,19 +832,33 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
} else {
pCtx->startOffset = pQuery->pos - (size - 1);
}
sas->offset = 0;
sas->colList = pQuery->colList;
sas->numOfCols = pQuery->numOfCols;
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
// here the pQuery->colList and sas->colList are identical
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo *pColMsg = &pQuery->colList[i];
assert(0);
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
sas->elemSize[i] = pColMsg->bytes;
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
int32_t numOfCols = taosArrayGetSize(pDataBlock);
dataBlock = NULL;
for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor
SColumnInfoData *p = taosArrayGet(pDataBlock, k);
if (pColMsg->colId == p->info.colId) {
dataBlock = p->pData;
break;
}
}
assert(dataBlock != NULL);
sas->data[i] = dataBlock + pCtx->startOffset * pQuery->colList[i].bytes; // start from the offset
}
sas->numOfCols = pQuery->numOfCols;
sas->offset = 0;
} else { // other type of query function
SColIndex *pCol = &pQuery->pSelectExpr[col].pBase.colInfo;
SColIndex *pCol = &pQuery->pSelectExpr[col].base.colInfo;
if (TSDB_COL_IS_TAG(pCol->flag) || pDataBlock == NULL) {
dataBlock = NULL;
} else {
......@@ -894,9 +907,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
SDataStatis *tpField = NULL;
......@@ -953,13 +966,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
tfree(sasArray);
}
......@@ -990,9 +1011,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int16_t *type, int16_t *bytes) {
char *groupbyColumnData = NULL;
static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock) {
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
......@@ -1015,12 +1034,22 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1
*type = pQuery->colList[colIndex].type;
*bytes = pQuery->colList[colIndex].bytes;
// groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf);
break;
/*
* the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare
* stage, the remain meter may not have the required column in cache actually. So, the validation of required
* column in cache with the corresponding meter schema is reinforced.
*/
int32_t numOfCols = taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
if (pColIndex->colId == p->info.colId) {
return p->pData;
}
}
}
return groupbyColumnData;
return NULL;
}
static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
......@@ -1091,12 +1120,11 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
char *groupbyColumnData = NULL;
if (groupbyStateValue) {
assert(0);
// groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock);
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
SDataStatis *pColStatis = NULL;
......@@ -1205,7 +1233,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
offset -= pCtx[0].startOffset;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
......@@ -1222,6 +1250,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
pQuery->lastKey = lastKey + step;
// todo refactor: extract method
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
free(sasArray);
}
......@@ -1336,7 +1374,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i];
......@@ -1376,7 +1414,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pRuntimeEnv->offset[0] = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
SColIndex* pIndex = &pSqlFuncMsg->colInfo;
......@@ -1418,7 +1456,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
int32_t f = pQuery->pSelectExpr[0].pBase.functionId;
int32_t f = pQuery->pSelectExpr[0].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
pCtx->param[2].i64Key = order;
......@@ -1526,7 +1564,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase;
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base;
// ignore the ts_comp function
if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 &&
......@@ -1548,7 +1586,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
bool isPointInterpoQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
return true;
}
......@@ -1560,7 +1598,7 @@ bool isPointInterpoQuery(SQuery *pQuery) {
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
bool isSumAvgRateQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS) {
continue;
}
......@@ -1576,7 +1614,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) {
bool isFirstLastRowQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
if (functionID == TSDB_FUNC_LAST_ROW) {
return true;
}
......@@ -1592,7 +1630,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) {
static bool needReverseScan(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) {
continue;
}
......@@ -1765,7 +1803,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
} else {
bool hasMultioutput = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase;
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].base;
if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
......@@ -1798,7 +1836,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) {
// the scan order is not matter
static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG ||
functionId == TSDB_FUNC_TAG_DUMMY) {
......@@ -2001,7 +2039,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
SInterpInfoDetail *pInterpDetail = pInterpInfo->pInterpDetail;
// for primary timestamp column, set the flag
if (pQuery->pSelectExpr[i].pBase.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (pQuery->pSelectExpr[i].base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pInterpDetail->primaryCol = 1;
}
......@@ -2024,11 +2062,11 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
// tag column does not need the interp environment
if (pQuery->pSelectExpr[i].pBase.functionId == TSDB_FUNC_TAG) {
if (pQuery->pSelectExpr[i].base.functionId == TSDB_FUNC_TAG) {
continue;
}
int32_t colInBuf = 0; // pQuery->pSelectExpr[i].pBase.colInfo.colIdxInBuf;
int32_t colInBuf = 0; // pQuery->pSelectExpr[i].base.colInfo.colIdxInBuf;
SInterpInfo *pInterpInfo = (SInterpInfo *)pRuntimeEnv->pCtx[i].aOutputBuf;
pInterpInfo->pInterpDetail = calloc(1, sizeof(SInterpInfoDetail));
......@@ -2039,7 +2077,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
assert(0);
// for primary timestamp column, set the flag
if (pQuery->pSelectExpr[i].pBase.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if (pQuery->pSelectExpr[i].base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pInterpDetail->primaryCol = 1;
} else {
doSetInterpVal(pCtx, prevKey, type, 1, pPointInterpSupport->pPrevPoint[colInBuf]);
......@@ -2138,7 +2176,7 @@ static int32_t getRowParamForMultiRowsOutput(SQuery *pQuery, bool isSTableQuery)
int32_t rowparam = 1;
if (isTopBottomQuery(pQuery) && (!isSTableQuery)) {
rowparam = pQuery->pSelectExpr[1].pBase.arg->argValue.i64;
rowparam = pQuery->pSelectExpr[1].base.arg->argValue.i64;
}
return rowparam;
......@@ -2204,7 +2242,7 @@ UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime,
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_SPREAD) {
pRuntimeEnv->pCtx[i].param[1].dKey = stime;
......@@ -2263,7 +2301,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
// int32_t functId = pQuery->pSelectExpr[i].base.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
// }
......@@ -2291,8 +2329,8 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
r = BLK_DATA_ALL_NEEDED;
} else {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t colId = pQuery->pSelectExpr[i].pBase.colInfo.colId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t colId = pQuery->pSelectExpr[i].base.colInfo.colId;
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId);
}
......@@ -2526,14 +2564,14 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].base;
if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
assert(pFuncMsg->numOfParams == 1);
doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
} else {
// set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SColIndex *pCol = &pQuery->pSelectExpr[idx].pBase.colInfo;
SColIndex *pCol = &pQuery->pSelectExpr[idx].base.colInfo;
// ts_comp column required the tag value for join filter
if (!TSDB_COL_IS_TAG(pCol->flag)) {
......@@ -2559,7 +2597,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (!mergeFlag) {
pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes;
pCtx[i].currentStage = FIRST_STAGE_MERGE;
......@@ -2583,7 +2621,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY) {
continue;
}
......@@ -2673,15 +2711,15 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
for (int32_t i = 0; i < numOfCols; ++i) {
switch (pQuery->pSelectExpr[i].type) {
case TSDB_DATA_TYPE_BINARY: {
int32_t colIndex = pQuery->pSelectExpr[i].pBase.colInfo.colIndex;
int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex;
int32_t type = 0;
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].pBase.colInfo.flag)) {
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].base.colInfo.flag)) {
type = pQuery->pSelectExpr[i].type;
} else {
type = pMeterObj->schema[colIndex].type;
}
printBinaryData(pQuery->pSelectExpr[i].pBase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
printBinaryData(pQuery->pSelectExpr[i].base.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
type);
break;
}
......@@ -2836,7 +2874,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
......@@ -3052,7 +3090,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
int32_t functId = pQuery->pSelectExpr[j].base.functionId;
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
......@@ -3074,7 +3112,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
int32_t functId = pQuery->pSelectExpr[j].base.functionId;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
......@@ -3144,7 +3182,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->resultInfo = &pRuntimeEnv->resultInfo[i];
// set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
......@@ -3160,7 +3198,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
// reset the execution contexts
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
assert(functionId != TSDB_FUNC_DIFF);
// set next output position
......@@ -3187,7 +3225,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
pRuntimeEnv->pCtx[j].currentStage = 0;
aAggs[functionId].init(&pRuntimeEnv->pCtx[j]);
......@@ -3213,7 +3251,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->rec.rows -= numOfSkip;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
......@@ -3255,7 +3293,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].pBase.functionId;
int16_t functId = pQuery->pSelectExpr[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
}
......@@ -3268,7 +3306,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
}
} else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].pBase.functionId;
int16_t functId = pQuery->pSelectExpr[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
}
......@@ -3436,7 +3474,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
}
/*
......@@ -3448,14 +3486,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
} else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
aAggs[pQuery->pSelectExpr[j].base.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
}
}
}
static bool hasMainOutput(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) {
return true;
......@@ -3561,7 +3599,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
......@@ -3659,7 +3697,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK
bool requireTimestamp(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; i++) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) {
return true;
}
......@@ -3876,7 +3914,7 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
srcData[i] = pDataSrc[i]->data;
functions[i] = pQuery->pSelectExpr[i].pBase.functionId;
functions[i] = pQuery->pSelectExpr[i].base.functionId;
}
assert(0);
......@@ -4300,15 +4338,6 @@ static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STable
return false;
}
static UNUSED_FUNC bool doCheckWithPrevQueryRange(SQuery *pQuery, TSKEY nextKey) {
if ((nextKey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextKey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
return false;
}
return true;
}
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
......@@ -5273,7 +5302,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
* @return
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr,
char **tagCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
......@@ -5286,6 +5315,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
pQueryMsg->queryType = htons(pQueryMsg->queryType);
pQueryMsg->tagNameRelType = htons(pQueryMsg->tagNameRelType);
pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols);
pQueryMsg->numOfOutput = htons(pQueryMsg->numOfOutput);
......@@ -5330,9 +5360,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pDestFilterInfo->filterstr) {
pDestFilterInfo->len = htobe64(pFilterInfo->len);
pDestFilterInfo->pz = (int64_t)calloc(1, pDestFilterInfo->len + 1);
memcpy((void *)pDestFilterInfo->pz, pMsg, pDestFilterInfo->len + 1);
pMsg += (pDestFilterInfo->len + 1);
pDestFilterInfo->pz = (int64_t) calloc(1, pDestFilterInfo->len);
memcpy((void *)pDestFilterInfo->pz, pMsg, pDestFilterInfo->len);
pMsg += (pDestFilterInfo->len);
} else {
pDestFilterInfo->lowerBndi = htobe64(pFilterInfo->lowerBndi);
pDestFilterInfo->upperBndi = htobe64(pFilterInfo->upperBndi);
......@@ -5343,8 +5373,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
}
bool hasArithmeticFunction = false;
*pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES);
SSqlFuncMsg *pExprMsg = (SSqlFuncMsg *)pMsg;
......@@ -5365,15 +5393,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pExprMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) {
pExprMsg->arg[j].argValue.pz = pMsg;
pMsg += pExprMsg->arg[j].argBytes + 1; // one more for the string terminated char.
pMsg += pExprMsg->arg[j].argBytes; // one more for the string terminated char.
} else {
pExprMsg->arg[j].argValue.i64 = htobe64(pExprMsg->arg[j].argValue.i64);
}
}
if (pExprMsg->functionId == TSDB_FUNC_ARITHM) {
hasArithmeticFunction = true;
} else if (pExprMsg->functionId == TSDB_FUNC_TAG || pExprMsg->functionId == TSDB_FUNC_TAGPRJ ||
if (pExprMsg->functionId == TSDB_FUNC_TAG || pExprMsg->functionId == TSDB_FUNC_TAGPRJ ||
pExprMsg->functionId == TSDB_FUNC_TAG_DUMMY) {
if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression.
return TSDB_CODE_INVALID_QUERY_MSG;
......@@ -5387,13 +5413,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pExprMsg = (SSqlFuncMsg *)pMsg;
}
pQueryMsg->colNameLen = htonl(pQueryMsg->colNameLen);
if (hasArithmeticFunction) { // column name array
assert(pQueryMsg->colNameLen > 0);
pQueryMsg->colNameList = (int64_t)pMsg;
pMsg += pQueryMsg->colNameLen;
}
pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList);
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
......@@ -5429,27 +5448,35 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pMsg += sizeof(int64_t) * pQueryMsg->numOfOutput;
}
// the tag query condition expression string is located at the end of query msg
if (pQueryMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryMsg->tagCondLen);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
pMsg += pQueryMsg->tagCondLen;
}
if (pQueryMsg->numOfTags > 0) {
(*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags);
for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) {
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pTagCol->colId);
pTagCol->bytes = htons(pTagCol->bytes);
pTagCol->type = htons(pTagCol->type);
pTagCol->numOfFilters = 0;
(*tagCols)[i] = *pTagCol;
pMsg += sizeof(SColumnInfo);
}
}
// the tag query condition expression string is located at the end of query msg
if (pQueryMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryMsg->tagCondLen);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
pMsg += pQueryMsg->tagCondLen;
}
if (*pMsg != 0) {
size_t len = strlen(pMsg) + 1;
*tbnameCond = malloc(len);
strcpy(*tbnameCond, pMsg);
pMsg += len;
}
qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64
", numOfGroupbyTagCols:%d, ts order:%d, "
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64
......@@ -5461,54 +5488,16 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
return 0;
}
static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pExpr, SQueryTableMsg *pQueryMsg) {
// SExprInfo *pBinaryExprInfo = &pExpr->binExprInfo;
// SColumnInfo * pColMsg = pQueryMsg->colList;
#if 0
tExprNode* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
qTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz,
pExpr->pBase.arg[0].argBytes);
static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
qTrace("qmsg:%p create arithmetic expr from binary string", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
if (pBinExpr == NULL) {
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
tExprNode* pExprNode = exprTreeFromBinary(pArithExprInfo->base.arg[0].argValue.pz, pArithExprInfo->base.arg[0].argBytes);
if (pExprNode == NULL) {
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
return TSDB_CODE_APP_ERROR;
}
pBinaryExprInfo->pBinExpr = pBinExpr;
int32_t num = 0;
int16_t ids[TSDB_MAX_COLUMNS] = {0};
tSQLBinaryExprTrv(pBinExpr, &num, ids);
qsort(ids, num, sizeof(int16_t), id_compar);
int32_t i = 0, j = 0;
while (i < num && j < num) {
if (ids[i] == ids[j]) {
j++;
} else {
ids[++i] = ids[j++];
}
}
assert(i <= num);
// there may be duplicated referenced columns.
num = i + 1;
pBinaryExprInfo->pReqColumns = malloc(sizeof(SColIndex) * num);
for (int32_t k = 0; k < num; ++k) {
SColIndex* pColIndex = &pBinaryExprInfo->pReqColumns[k];
pColIndex->colId = ids[k];
}
pBinaryExprInfo->numOfCols = num;
free(pSchema);
#endif
pArithExprInfo->pExpr = pExprNode;
return TSDB_CODE_SUCCESS;
}
......@@ -5526,14 +5515,14 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
int16_t tagLen = 0;
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].pBase = *pExprMsg[i];
pExprs[i].base = *pExprMsg[i];
pExprs[i].bytes = 0;
int16_t type = 0;
int16_t bytes = 0;
// parse the arithmetic expression
if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) {
if (pExprs[i].base.functionId == TSDB_FUNC_ARITHM) {
code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
if (code != TSDB_CODE_SUCCESS) {
......@@ -5543,26 +5532,26 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypeDesc[type].nSize;
} else if (pExprs[i].pBase.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column
} else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column
type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_TABLE_NAME_LEN;
} else{
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase, pTagCols);
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
assert(j < pQueryMsg->numOfCols);
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].pBase.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
type = pCol->type;
bytes = pCol->bytes;
}
int32_t param = pExprs[i].pBase.arg[0].argValue.i64;
if (getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
int32_t param = pExprs[i].base.arg[0].argValue.i64;
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
&pExprs[i].interResBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return TSDB_CODE_INVALID_QUERY_MSG;
}
if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) {
if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExprs[i].bytes;
}
assert(isValidDataType(pExprs[i].type, pExprs[i].bytes));
......@@ -5572,17 +5561,17 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
// TODO refactor
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].pBase = *pExprMsg[i];
int16_t functId = pExprs[i].pBase.functionId;
pExprs[i].base = *pExprMsg[i];
int16_t functId = pExprs[i].base.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase, pTagCols);
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
assert(j < pQueryMsg->numOfCols);
SColumnInfo *pCol = &pQueryMsg->colList[j];
int32_t ret =
getResultDataInfo(pCol->type, pCol->bytes, functId, pExprs[i].pBase.arg[0].argValue.i64,
getResultDataInfo(pCol->type, pCol->bytes, functId, pExprs[i].base.arg[0].argValue.i64,
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable);
assert(ret == TSDB_CODE_SUCCESS);
}
......@@ -5707,7 +5696,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
assert(pQuery->pSelectExpr != NULL && pQuery != NULL);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase;
SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].base;
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) {
continue;
}
......@@ -5758,6 +5747,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->interpoType = pQueryMsg->interpoType;
pQuery->numOfTags = pQueryMsg->numOfTags;
// todo do not allocate ??
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) {
goto _cleanup;
......@@ -5935,12 +5925,12 @@ static void freeQInfo(SQInfo *pQInfo) {
if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
// SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].arithExprInfo;
if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns);
tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL);
}
// if (pBinExprInfo->numOfCols > 0) {
// tfree(pBinExprInfo->pReqColumns);
// tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL);
// }
}
tfree(pQuery->pSelectExpr);
......@@ -6047,13 +6037,13 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
int32_t code = TSDB_CODE_SUCCESS;
char * tagCond = NULL;
char * tagCond = NULL, *tbnameCond = NULL;
SArray * pTableIdList = NULL;
SSqlFuncMsg **pExprMsg = NULL;
SColIndex * pGroupColIndex = NULL;
SColumnInfo* pTagColumnInfo = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex, &pTagColumnInfo)) !=
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo)) !=
TSDB_CODE_SUCCESS) {
return code;
}
......@@ -6088,9 +6078,16 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
STableId *id = taosArrayGet(pTableIdList, 0);
id->uid = -1; // todo fix me
/*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex,
pQueryMsg->numOfGroupCols);
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) {
numOfGroupByCols = 0;
}
// todo handle the error
/*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
numOfGroupByCols);
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS;
goto _query_over;
......@@ -6112,6 +6109,8 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
code = initQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery);
_query_over:
tfree(tagCond);
tfree(tbnameCond);
taosArrayDestroy(pTableIdList);
// if failed to add ref for all meters in this query, abort current query
......
......@@ -72,7 +72,7 @@ void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t t
* @param len
* @param type
*/
void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t type) {
void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
......@@ -109,10 +109,10 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t
break;
}
case TSDB_DATA_TYPE_BINARY: {
pVar->pz = strndup(pz, len);
pVar->nLen = strdequote(pVar->pz);
case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
pVar->pz = calloc(len, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = len;
break;
}
......@@ -130,6 +130,17 @@ void tVariantDestroy(tVariant *pVar) {
tfree(pVar->pz);
pVar->nLen = 0;
}
// NOTE: this is only for string array
if (pVar->nType == TSDB_DATA_TYPE_ARRAY) {
size_t num = taosArrayGetSize(pVar->arr);
for(size_t i = 0; i < num; i++) {
void* p = taosArrayGetP(pVar->arr, i);
free(p);
}
taosArrayDestroy(pVar->arr);
pVar->arr = NULL;
}
}
void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
......@@ -145,6 +156,18 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
pDst->pz = calloc(1, len);
memcpy(pDst->pz, pSrc->pz, len);
return;
}
// this is only for string array
if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) {
size_t num = taosArrayGetSize(pSrc->arr);
pDst->arr = taosArrayInit(num, sizeof(char*));
for(size_t i = 0; i < num; i++) {
char* p = (char*)taosArrayGetP(pSrc->arr, i);
char* n = strdup(p);
taosArrayPush(pDst->arr, &n);
}
}
}
......
......@@ -556,8 +556,7 @@ void exprSerializeTest1() {
ASSERT_TRUE(size > 0);
char* b = tbufGetData(&buf, false);
tExprNode* p2 = NULL;
exprTreeFromBinary(b, size, &p2);
tExprNode* p2 = exprTreeFromBinary(b, size);
ASSERT_EQ(p1->nodeType, p2->nodeType);
ASSERT_EQ(p2->_node.optr, p1->_node.optr);
......@@ -593,8 +592,7 @@ void exprSerializeTest2() {
ASSERT_TRUE(size > 0);
char* b = tbufGetData(&buf, false);
tExprNode* p2 = NULL;
exprTreeFromBinary(b, size, &p2);
tExprNode* p2 = exprTreeFromBinary(b, size);
ASSERT_EQ(p1->nodeType, p2->nodeType);
ASSERT_EQ(p2->_node.optr, p1->_node.optr);
......
......@@ -58,7 +58,7 @@ TEST(testCase, patternMatchTest) {
EXPECT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH);
str = "abcdefgabcdeju";
ret = patternMatch("abc%f_", str, 1, &info);
ret = patternMatch("abc%f_", str, 1, &info); // pattern string is longe than the size
EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH);
str = "abcdefgabcdeju";
......@@ -72,4 +72,8 @@ TEST(testCase, patternMatchTest) {
str = "abcdefgabcdeju";
ret = patternMatch("a__", str, 2, &info);
EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH);
str = "carzero";
ret = patternMatch("%o", str, strlen(str), &info);
EXPECT_EQ(ret, TSDB_PATTERN_MATCH);
}
......@@ -74,9 +74,12 @@ typedef struct STable {
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void * eventHandler; // TODO
void * streamHandler; // TODO
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
struct STable *next; // TODO: remove the next
} STable;
#define TSDB_GET_TABLE_LAST_KEY(pTable) ((pTable)->lastKey)
void * tsdbEncodeTable(STable *pTable, int *contLen);
STable *tsdbDecodeTable(void *cont, int contLen);
void tsdbFreeEncode(void *cont);
......
......@@ -163,6 +163,34 @@ int32_t tsdbDropRepo(TsdbRepoT *repo) {
return 0;
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pFGroup = NULL;
SFileGroupIter iter;
SRWHelper rhelper = {0};
if (tsdbInitReadHelper(&rhelper, pRepo) < 0) goto _err;
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_ASC);
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable * pTable = pMeta->tables[i];
SCompIdx *pIdx = &rhelper.pCompIdx[i];
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;
}
}
tsdbDestroyHelper(&rhelper);
return 0;
_err:
tsdbDestroyHelper(&rhelper);
return -1;
}
/**
* Open an existing TSDB storage repository
* @param tsdbDir the existing TSDB root directory
......@@ -210,6 +238,16 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
return NULL;
}
// Restore key from file
if (tsdbRestoreInfo(pRepo) < 0) {
tsdbFreeCache(pRepo->tsdbCache);
tsdbFreeMeta(pRepo->tsdbMeta);
tsdbCloseFileH(pRepo->tsdbFileH);
free(pRepo->rootDir);
free(pRepo);
return NULL;
}
pRepo->state = TSDB_REPO_STATE_ACTIVE;
return (TsdbRepoT *)pRepo;
......@@ -755,6 +793,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
tSkipListPut(pTable->mem->pData, pNode);
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
if (key > pTable->lastKey) pTable->lastKey = key;
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
......
......@@ -311,6 +311,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
table->tableId = pCfg->tableId;
table->name = strdup(pCfg->name);
table->lastKey = 0;
if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE
table->type = TSDB_CHILD_TABLE;
table->superUid = pCfg->superUid;
......
......@@ -1213,7 +1213,9 @@ void filterPrepare(void* expr, void* param) {
pInfo->compare = getComparFunc(pSchema->type, pCond->nType, pInfo->optr);
tVariantAssign(&pInfo->q, pCond);
tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
if (pInfo->optr != TSDB_RELATION_IN) {
tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
}
}
int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) {
......@@ -1327,12 +1329,9 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
size_t num = taosArrayGetSize(pTableList);
SArray* sa = taosArrayInit(num, sizeof(SPair));
for(int32_t i = 0; i < num; ++i) {
SArray* sa = taosArrayInit(size, sizeof(SPair));
for(int32_t i = 0; i < size; ++i) {
STable* pTable = taosArrayGetP(pTableList, i);
SPair p = {.first = pTable};
taosArrayPush(sa, &p);
}
......@@ -1358,16 +1357,26 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = dataRowTuple(pTable->tagVal); // todo not only the first column
char* val = NULL;
int8_t type = pInfo->sch.type;
if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
val = pTable->name;
type = TSDB_DATA_TYPE_BINARY;
} else {
val = dataRowTuple(pTable->tagVal); // todo not only the first column
}
int32_t ret = 0;
if (pInfo->q.nType == TSDB_DATA_TYPE_BINARY || pInfo->q.nType == TSDB_DATA_TYPE_NCHAR) {
ret = pInfo->compare(val, pInfo->q.pz);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (pInfo->optr == TSDB_RELATION_IN) {
ret = pInfo->compare(val, pInfo->q.arr);
} else {
ret = pInfo->compare(val, pInfo->q.pz);
}
} else {
tVariant t = {0};
tVariantCreateFromBinary(&t, val, (uint32_t)pInfo->sch.bytes, type);
ret = pInfo->compare(&t.i64Key, &pInfo->q.i64Key);
}
......@@ -1393,6 +1402,9 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
case TSDB_RELATION_LIKE: {
return ret == 0;
}
case TSDB_RELATION_IN: {
return ret == 1;
}
default:
assert(false);
......@@ -1400,6 +1412,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
return true;
}
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// query according to the binary expression
STSchema* pSchema = pSTable->tagSchema;
......@@ -1422,12 +1435,23 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
tExprTreeDestroy(&pExpr, destroyHelper);
convertQueryResult(pRes, pTableList);
taosArrayDestroy(pTableList);
free(schema);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) {
int32_t tsdbQueryByTagsCond(
TsdbRepoT *tsdb,
int64_t uid,
const char *pTagCond,
size_t len,
int16_t tagNameRelType,
const char* tbnameCond,
STableGroupInfo *pGroupInfo,
SColIndex *pColIndex,
int32_t numOfCols
) {
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pSTable == NULL) {
uError("failed to get stable, uid:%" PRIu64, uid);
......@@ -1437,31 +1461,35 @@ int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond,
SArray* res = taosArrayInit(8, POINTER_BYTES);
STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pSTable);
if (pTagCond == NULL || len == 0) { // no tags condition, all tables created according to this stable are involved
// no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableIdList(tsdb, uid, res);
if (ret != TSDB_CODE_SUCCESS) {
taosArrayDestroy(res);
return ret;
if (ret == TSDB_CODE_SUCCESS) {
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
}
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
return ret;
}
tExprNode* pExprNode = NULL;
int32_t ret = TSDB_CODE_SUCCESS;
// failed to build expression, no result, return immediately
if ((ret = exprTreeFromBinary(pTagCond, len, &pExprNode) != TSDB_CODE_SUCCESS) || (pExprNode == NULL)) {
uError("stable:%" PRIu64 ", failed to deserialize expression tree, error exists", uid);
taosArrayDestroy(res);
return ret;
tExprNode* expr = exprTreeFromTableName(tbnameCond);
tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
if (tagExpr != NULL) {
if (expr == NULL) {
expr = tagExpr;
} else {
tExprNode* tbnameExpr = expr;
expr = calloc(1, sizeof(tExprNode));
expr->nodeType = TSQL_NODE_EXPR;
expr->_node.optr = tagNameRelType;
expr->_node.pLeft = tagExpr;
expr->_node.pRight = tbnameExpr;
}
}
doQueryTableList(pSTable, res, pExprNode);
doQueryTableList(pSTable, res, expr);
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
......
......@@ -53,7 +53,7 @@ void* taosArrayPush(SArray* pArray, void* pData);
*
* @param pArray
*/
void taosArrayPop(SArray* pArray);
void* taosArrayPop(SArray* pArray);
/**
* get the data from array
......@@ -112,6 +112,34 @@ SArray* taosArrayClone(SArray* pSrc);
*/
void taosArrayDestroy(SArray* pArray);
/**
* sort the array
* @param pArray
* @param compar
*/
void taosArraySort(SArray* pArray, int (*compar)(const void*, const void*));
/**
* sort string array
* @param pArray
*/
void taosArraySortString(SArray* pArray);
/**
* search the array
* @param pArray
* @param compar
* @param key
*/
void* taosArraySearch(const SArray* pArray, int (*compar)(const void*, const void*), const void* key);
/**
* search the array
* @param pArray
* @param key
*/
char* taosArraySearchString(const SArray* pArray, const char* key);
#ifdef __cplusplus
}
#endif
......
......@@ -120,7 +120,7 @@ void tbufWriteString(SBuffer* buf, const char* str);
TBUFFER_DEFINE_FUNCTION(bool, Bool)
TBUFFER_DEFINE_FUNCTION(char, Char)
TBUFFER_DEFINE_FUNCTION(int8_t, Int8)
TBUFFER_DEFINE_FUNCTION(uint8_t, Unt8)
TBUFFER_DEFINE_FUNCTION(uint8_t, Uint8)
TBUFFER_DEFINE_FUNCTION(int16_t, Int16)
TBUFFER_DEFINE_FUNCTION(uint16_t, Uint16)
TBUFFER_DEFINE_FUNCTION(int32_t, Int32)
......
......@@ -27,7 +27,7 @@ void* taosArrayInit(size_t size, size_t elemSize) {
return NULL;
}
pArray->pData = calloc(size, elemSize * size);
pArray->pData = calloc(size, elemSize);
if (pArray->pData == NULL) {
free(pArray);
return NULL;
......@@ -76,12 +76,14 @@ void* taosArrayPush(SArray* pArray, void* pData) {
return dst;
}
void taosArrayPop(SArray* pArray) {
if (pArray == NULL || pArray->size == 0) {
return;
}
void* taosArrayPop(SArray* pArray) {
assert( pArray != NULL );
if (pArray->size == 0) {
return NULL;
}
pArray->size -= 1;
return TARRAY_GET_ELEM(pArray, pArray->size);
}
void* taosArrayGet(const SArray* pArray, size_t index) {
......@@ -183,3 +185,40 @@ void taosArrayDestroy(SArray* pArray) {
free(pArray->pData);
free(pArray);
}
void taosArraySort(SArray* pArray, int (*compar)(const void*, const void*)) {
assert(pArray != NULL);
assert(compar != NULL);
qsort(pArray->pData, pArray->size, pArray->elemSize, compar);
}
void* taosArraySearch(const SArray* pArray, int (*compar)(const void*, const void*), const void* key) {
assert(pArray != NULL);
assert(compar != NULL);
assert(key != NULL);
return bsearch(key, pArray->pData, pArray->size, pArray->elemSize, compar);
}
static int taosArrayCompareString(const void* a, const void* b) {
const char* x = *(const char**)a;
const char* y = *(const char**)b;
return strcmp(x, y);
}
void taosArraySortString(SArray* pArray) {
assert(pArray != NULL);
qsort(pArray->pData, pArray->size, pArray->elemSize, taosArrayCompareString);
}
char* taosArraySearchString(const SArray* pArray, const char* key) {
assert(pArray != NULL);
assert(key != NULL);
void* p = bsearch(&key, pArray->pData, pArray->size, pArray->elemSize, taosArrayCompareString);
if (p == NULL) {
return NULL;
}
return *(char**)p;
}
\ No newline at end of file
#include "taosdef.h"
#include "tcompare.h"
#include <tarray.h>
#include "tutil.h"
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int64_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT16_VAL(pLeft) - GET_INT16_VAL(pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT8_VAL(pLeft) - GET_INT8_VAL(pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) {
......@@ -69,12 +61,7 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
}
int32_t compareStrVal(const void *pLeft, const void *pRight) {
int32_t ret = strcmp(pLeft, pRight);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1 : -1;
}
return (int32_t)strcmp(pLeft, pRight);
}
int32_t compareWStrVal(const void *pLeft, const void *pRight) {
......@@ -228,6 +215,11 @@ static UNUSED_FUNC int32_t compareStrPatternComp(const void* pLeft, const void*
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) {
const SArray* arr = (const SArray*)pRight;
return taosArraySearchString(arr, pLeft) == NULL ? 0 : 1;
}
static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'};
......@@ -250,7 +242,6 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr)
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
// assert(type == filterDataType);
if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) {
comparFn = compareInt64Val;
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
......@@ -259,6 +250,7 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr)
break;
}
case TSDB_DATA_TYPE_BOOL: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
comparFn = compareInt32Val;
......@@ -267,6 +259,7 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr)
}
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
......@@ -276,12 +269,18 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr)
}
break;
}
case TSDB_DATA_TYPE_BINARY: {
assert(filterDataType == TSDB_DATA_TYPE_BINARY);
if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
assert(filterDataType == TSDB_DATA_TYPE_BINARY);
comparFn = compareStrPatternComp;
} else if (optr == TSDB_RELATION_IN) {
assert(filterDataType == TSDB_DATA_TYPE_ARRAY);
comparFn = compareFindStrInArray;
} else { /* normal relational comparFn */
assert(filterDataType == TSDB_DATA_TYPE_BINARY);
comparFn = compareStrVal;
}
......
......@@ -75,7 +75,7 @@ int main(int argc, char *argv[]) {
doQuery(taos, "create database if not exists test");
doQuery(taos, "use test");
doQuery(taos, "insert into tm99 values('2020-01-01 1:1:1', 99);");
doQuery(taos, "select count(*),k,sum(k) from m1 group by k");
// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
......@@ -86,7 +86,7 @@ int main(int argc, char *argv[]) {
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);");
doQuery(taos, "select sum(k),count(*) from m1 group by a");
// doQuery(taos, "select sum(k),count(*) from m1 group by a");
taos_close(taos);
return 0;
......
......@@ -12,7 +12,6 @@
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
......
......@@ -13,9 +13,6 @@
import sys
import datetime
import taos
from util.log import *
from util.cases import *
from util.sql import *
......
......@@ -12,7 +12,6 @@
# -*- coding: utf-8 -*-
import sys
import taos
import datetime
from util.log import *
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
tdSql.execute('create table cars (ts timestamp, speed int) tags(id int)')
tdSql.execute("create table carzero using cars tags(0)")
tdSql.execute("create table carone using cars tags(1)")
tdSql.execute("create table cartwo using cars tags(2)")
tdSql.execute("insert into carzero values(now, 100) carone values(now, 110)")
tdSql.query("select * from cars where tbname in ('carzero', 'carone')")
tdSql.checkRows(2)
tdSql.query("select * from cars where tbname in ('carzero', 'cartwo')")
tdSql.checkRows(1)
tdSql.query("select * from cars where id=1 or tbname in ('carzero', 'cartwo')")
tdSql.checkRows(2)
tdSql.query("select * from cars where id=1 and tbname in ('carzero', 'cartwo')")
tdSql.checkRows(0)
tdSql.query("select * from cars where id=0 and tbname in ('carzero', 'cartwo')")
tdSql.checkRows(1)
"""
tdSql.query("select * from cars where tbname like 'car%'")
tdSql.checkRows(2)
tdSql.cursor.execute("use db")
tdSql.query("select * from cars where tbname like '%o'")
tdSql.checkRows(1)
tdSql.query("select * from cars where id=1 and tbname like 'car%')
tdSql.checkRows(1)
tdSql.query("select * from cars where id = 1 and tbname like '%o')
tdSql.checkRows(0)
tdSql.query("select * from cars where id = 1 or tbname like '%o')
tdSql.checkRows(2)
"""
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -71,7 +71,7 @@ class TDCases:
case.run()
except Exception as e:
tdLog.notice(repr(e))
tdLog.notice("%s failed: %s" % (__file__, fileName))
tdLog.exit("%s failed: %s" % (__file__, fileName))
case.stop()
runNum += 1
continue
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册