未验证 提交 c358606b 编写于 作者: M Minglei Jin 提交者: GitHub

Merge branch '2.4' into fix/TS-927-24

...@@ -170,6 +170,16 @@ void tscAddIntoStreamList(SSqlStream *pStream) { ...@@ -170,6 +170,16 @@ void tscAddIntoStreamList(SSqlStream *pStream) {
STscObj * pObj = pStream->pSql->pTscObj; STscObj * pObj = pStream->pSql->pTscObj;
pthread_mutex_lock(&pObj->mutex); pthread_mutex_lock(&pObj->mutex);
//check if newly added stream node is present
//in the streamList to prevent loop in the list
SSqlStream *iter = pObj->streamList;
while (iter) {
if (pStream == iter) {
pthread_mutex_unlock(&pObj->mutex);
return;
}
iter = iter->next;
}
pStream->next = pObj->streamList; pStream->next = pObj->streamList;
if (pObj->streamList) pObj->streamList->prev = pStream; if (pObj->streamList) pObj->streamList->prev = pStream;
......
...@@ -2852,6 +2852,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2852,6 +2852,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
char val[8] = {0}; char val[8] = {0};
int64_t tickPerSec = 0; int64_t tickPerSec = 0;
char *exprToken = tcalloc(pParamElem[1].pNode->exprToken.n + 1, sizeof(char));
memcpy(exprToken, pParamElem[1].pNode->exprToken.z, pParamElem[1].pNode->exprToken.n);
if (pParamElem[1].pNode->exprToken.type == TK_NOW || strstr(exprToken, "now")) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tfree(exprToken);
if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) { if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
...@@ -5229,7 +5236,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql ...@@ -5229,7 +5236,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
} }
} }
if (pRight != NULL && (pRight->tokenId == TK_ID || pRight->tokenId == TK_ARROW)) { // join on tag columns for stable query if (joinQuery && pRight != NULL && (pRight->tokenId == TK_ID || pRight->tokenId == TK_ARROW)) { // join on tag columns for stable query
if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) { if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
...@@ -6090,7 +6097,7 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t ...@@ -6090,7 +6097,7 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t
// todo error !!!! // todo error !!!!
int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { int32_t tsRewriteFieldNameIfNecessary(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char rep[] = {'(', ')', '*', ',', '.', '/', '\\', '+', '-', '%', ' '}; const char rep[] = {'(', ')', '*', ',', '.', '/', '\\', '+', '-', '%', ' ', '`'};
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
char* fieldName = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->name; char* fieldName = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->name;
......
...@@ -1479,6 +1479,18 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1479,6 +1479,18 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
break; break;
} }
} }
// set input data order to param[1]
if(pex->base.functionId == TSDB_FUNC_FIRST || pex->base.functionId == TSDB_FUNC_FIRST_DST ||
pex->base.functionId == TSDB_FUNC_LAST || pex->base.functionId == TSDB_FUNC_LAST_DST) {
// set input order
SQueryInfo* pInputQI = pSqlObjList[0]->cmd.pQueryInfo;
if(pInputQI) {
pex->base.numOfParams = 3;
pex->base.param[2].nType = TSDB_DATA_TYPE_INT;
pex->base.param[2].i64 = pInputQI->order.order;
}
}
} }
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self); tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self);
...@@ -4303,6 +4315,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4303,6 +4315,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
// create sub query to handle the sub query. // create sub query to handle the sub query.
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
STableMetaInfo* pSubMeta = tscGetMetaInfo(pq, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pSubMeta) &&
pq->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
}
executeQuery(psub, pq); executeQuery(psub, pq);
} }
......
...@@ -173,6 +173,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details ...@@ -173,6 +173,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details
typedef struct STsdbQueryCond { typedef struct STsdbQueryCond {
STimeWindow twindow; STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo *colList;
bool loadExternalRows; // load external rows or not bool loadExternalRows; // load external rows or not
...@@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
/** /**
* get the statistics of repo usage * get the statistics of repo usage
* @param repo. point to the tsdbrepo * @param repo. point to the tsdbrepo
......
...@@ -131,7 +131,7 @@ static void *shellCheckThreadFp(void *arg) { ...@@ -131,7 +131,7 @@ static void *shellCheckThreadFp(void *arg) {
char *tbname = tbNames[t]; char *tbname = tbNames[t];
if (tbname == NULL) break; if (tbname == NULL) break;
snprintf(sql, SHELL_SQL_LEN, "select last_row(_c0) from %s;", tbname); snprintf(sql, SHELL_SQL_LEN, "select count(*) from %s;", tbname);
TAOS_RES *pSql = taos_query(pThread->taos, sql); TAOS_RES *pSql = taos_query(pThread->taos, sql);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
......
...@@ -43,7 +43,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_ ...@@ -43,7 +43,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_
int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg); int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg);
void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle); void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle);
void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle); void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle);
int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid); int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid, int32_t vgId);
int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCheck); int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCheck);
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable);
......
...@@ -48,6 +48,12 @@ ...@@ -48,6 +48,12 @@
#define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14 #define CREATE_CTABLE_RETRY_SEC 14
// informal
#define META_SYNC_TABLE_NAME "_taos_meta_sync_table_name_taos_"
#define META_SYNC_TABLE_NAME_LEN 32
static int32_t tsMetaSyncOption = 0;
// informal
int64_t tsCTableRid = -1; int64_t tsCTableRid = -1;
static void * tsChildTableSdb; static void * tsChildTableSdb;
int64_t tsSTableRid = -1; int64_t tsSTableRid = -1;
...@@ -1726,6 +1732,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1726,6 +1732,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
cols++; cols++;
numOfRows++; numOfRows++;
mDebug("stable: %s, uid: %" PRIu64, prefix, pTable->uid);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
} }
...@@ -2227,9 +2236,19 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2227,9 +2236,19 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t tid = 0; int32_t tid = 0;
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid); int32_t vgId = 0;
if (tsMetaSyncOption) {
char *pTbName = strchr(pCreate->tableName, '.');
if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) {
if (0 == strncmp(META_SYNC_TABLE_NAME, ++pTbName, META_SYNC_TABLE_NAME_LEN)) {
vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN);
}
}
}
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mDebug("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, tstrerror(code)); pCreate->tableName, tstrerror(code));
return code; return code;
} }
......
...@@ -428,10 +428,47 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { ...@@ -428,10 +428,47 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid) { int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid, int32_t vgId) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
pthread_mutex_lock(&pDb->mutex); pthread_mutex_lock(&pDb->mutex);
if (vgId > 0) {
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
SVgObj *pVgroup = pDb->vgList[v];
if (pVgroup == NULL) {
mError("db:%s, vgroup: %d is null", pDb->name, v);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_APP_ERROR;
}
if (pVgroup->vgId != (uint32_t)vgId) { // find the target vgId
continue;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
int curMaxId = taosIdPoolMaxSize(pVgroup->idPool);
if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) {
mError("msg:%p, app:%p db:%s, no enough sid in vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pDb->name,
pVgroup->vgId);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_APP_ERROR;
}
}
mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, sid);
*pSid = sid;
*ppVgroup = pVgroup;
pDb->vgListIndex = v;
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_SUCCESS;
}
pthread_mutex_unlock(&pDb->mutex);
mError("db:%s, vgroup: %d not exist", pDb->name, vgId);
return TSDB_CODE_MND_APP_ERROR;
}
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) { for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups; int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups;
SVgObj *pVgroup = pDb->vgList[vgIndex]; SVgObj *pVgroup = pDb->vgList[vgIndex];
......
...@@ -458,14 +458,18 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) { ...@@ -458,14 +458,18 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) {
", expire_time int, timeseries_used int, timeseries_total int)", ", expire_time int, timeseries_used int, timeseries_total int)",
tsMonitorDbName); tsMonitorDbName);
} else if (cmd == MON_CMD_CREATE_MT_RESTFUL) { } else if (cmd == MON_CMD_CREATE_MT_RESTFUL) {
int usedLen = 0, len = 0;
int pos = snprintf(sql, SQL_LENGTH, int pos = snprintf(sql, SQL_LENGTH,
"create table if not exists %s.restful_info(ts timestamp", tsMonitorDbName); "create table if not exists %s.restful_info(ts timestamp", tsMonitorDbName);
usedLen += pos;
for (int i = 0; i < tListLen(monHttpStatusTable); ++i) { for (int i = 0; i < tListLen(monHttpStatusTable); ++i) {
pos += snprintf(sql + pos, SQL_LENGTH, ", `%s(%d)` int", len = snprintf(sql + pos, SQL_LENGTH - usedLen, ", %s_%d int",
monHttpStatusTable[i].name, monHttpStatusTable[i].name,
monHttpStatusTable[i].code); monHttpStatusTable[i].code);
usedLen += len;
pos += len;
} }
snprintf(sql + pos, SQL_LENGTH, snprintf(sql + pos, SQL_LENGTH - usedLen,
") tags (dnode_id int, dnode_ep binary(%d))", ") tags (dnode_id int, dnode_ep binary(%d))",
TSDB_EP_LEN); TSDB_EP_LEN);
} else if (cmd == MON_CMD_CREATE_TB_RESTFUL) { } else if (cmd == MON_CMD_CREATE_TB_RESTFUL) {
......
Subproject commit 273b5219f8bcc604e43beebc6f1f95abed85170a Subproject commit 47fb0b3e627ddadf1ca983c1d75b9a4e44cd98fd
...@@ -237,6 +237,7 @@ typedef struct SQueryAttr { ...@@ -237,6 +237,7 @@ typedef struct SQueryAttr {
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock bool multigroupResult; // multigroup result can exist in one SSDataBlock
bool needSort; // need sort rowRes bool needSort; // need sort rowRes
bool skipOffset; // can skip offset if true
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
......
...@@ -1620,14 +1620,25 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* ...@@ -1620,14 +1620,25 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
// todo opt for null block // todo opt for null block
static void first_function(SQLFunctionCtx *pCtx) { static void first_function(SQLFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC) { SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx);
return;
}
int32_t notNullElems = 0; int32_t notNullElems = 0;
int32_t step = 1;
int32_t i = 0;
bool inputAsc = true;
// input data come from sub query, input data order equal to sub query order
if(pCtx->numOfParams == 3) {
if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT && pCtx->param[2].i64 == TSDB_ORDER_DESC) {
step = -1;
i = pCtx->size - 1;
inputAsc = false;
}
} else if (pCtx->order == TSDB_ORDER_DESC) {
return ;
}
// handle the null value if(pCtx->order == TSDB_ORDER_ASC && inputAsc) {
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t m = 0; m < pCtx->size; ++m, i+=step) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue; continue;
...@@ -1646,7 +1657,28 @@ static void first_function(SQLFunctionCtx *pCtx) { ...@@ -1646,7 +1657,28 @@ static void first_function(SQLFunctionCtx *pCtx) {
notNullElems++; notNullElems++;
break; break;
} }
} else { // desc order
for (int32_t m = 0; m < pCtx->size; ++m, i+=step) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) {
continue;
}
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) > ts) {
pResInfo->hasResult = DATA_SET_FLAG;
memcpy(pCtx->pOutput, data, pCtx->inputBytes);
*(TSKEY*)buf = ts;
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
notNullElems++;
break;
}
}
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
} }
...@@ -1730,16 +1762,23 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) { ...@@ -1730,16 +1762,23 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
* least one data in this block that is not null.(TODO opt for this case) * least one data in this block that is not null.(TODO opt for this case)
*/ */
static void last_function(SQLFunctionCtx *pCtx) { static void last_function(SQLFunctionCtx *pCtx) {
if (pCtx->order != pCtx->param[0].i64) { SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx);
int32_t notNullElems = 0;
int32_t step = -1;
int32_t i = pCtx->size - 1;
// input data come from sub query, input data order equal to sub query order
if(pCtx->numOfParams == 3) {
if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT && pCtx->param[2].i64 == TSDB_ORDER_DESC) {
step = 1;
i = 0;
}
} else if (pCtx->order != pCtx->param[0].i64) {
return; return;
} }
SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx);
int32_t notNullElems = 0;
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
for (int32_t m = pCtx->size - 1; m >= 0; --m, i += step) {
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) { if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) {
continue; continue;
...@@ -1756,7 +1795,7 @@ static void last_function(SQLFunctionCtx *pCtx) { ...@@ -1756,7 +1795,7 @@ static void last_function(SQLFunctionCtx *pCtx) {
break; break;
} }
} else { // ascending order } else { // ascending order
for (int32_t i = pCtx->size - 1; i >= 0; --i) { for (int32_t m = pCtx->size - 1; m >= 0; --m, i += step) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) { if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) {
continue; continue;
......
...@@ -4905,6 +4905,11 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { ...@@ -4905,6 +4905,11 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
.loadExternalRows = false, .loadExternalRows = false,
}; };
// set offset with
if(pQueryAttr->skipOffset) {
cond.offset = pQueryAttr->limit.offset;
}
TIME_WINDOW_COPY(cond.twindow, *win); TIME_WINDOW_COPY(cond.twindow, *win);
return cond; return cond;
} }
...@@ -5607,6 +5612,18 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { ...@@ -5607,6 +5612,18 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order; return pTableScanInfo->order;
} }
// check all SQLFunctionCtx is completed
static bool allCtxCompleted(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx) {
// only one false, return false
for(int32_t i = 0; i < pOperator->numOfOutput; i++) {
if(pCtx[i].resultInfo == NULL)
return false;
if(!pCtx[i].resultInfo->complete)
return false;
}
return true;
}
// this is a blocking operator // this is a blocking operator
static SSDataBlock* doAggregate(void* param, bool* newgroup) { static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
...@@ -5645,6 +5662,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -5645,6 +5662,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
// if all pCtx is completed, then query should be over
if(allCtxCompleted(pOperator, pInfo->pCtx))
break;
} }
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
...@@ -5858,19 +5878,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5858,19 +5878,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
return NULL; return NULL;
} }
bool move = false;
int32_t skip = 0;
int32_t remain = 0;
int64_t srows = tsdbSkipOffset(pRuntimeEnv->pQueryHandle);
if (pRuntimeEnv->currentOffset == 0) { if (pRuntimeEnv->currentOffset == 0) {
break; break;
}
else if(srows > 0) {
if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
move = true;
skip = (int32_t)(pRuntimeEnv->currentOffset - srows);
remain = (int32_t)(pBlock->info.rows - skip);
}
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows; pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else { } else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); move = true;
pBlock->info.rows = remain; skip = (int32_t)pRuntimeEnv->currentOffset;
remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
}
// need move
if(move) {
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes);
} }
pRuntimeEnv->currentOffset = 0; pRuntimeEnv->currentOffset = 0;
......
...@@ -39,6 +39,9 @@ ...@@ -39,6 +39,9 @@
.tid = (_checkInfo)->tableId.tid, \ .tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid}) .uid = (_checkInfo)->tableId.uid})
// limit offset start optimization for rows read over this value
#define OFFSET_SKIP_THRESHOLD 5000
enum { enum {
TSDB_QUERY_TYPE_ALL = 1, TSDB_QUERY_TYPE_ALL = 1,
TSDB_QUERY_TYPE_LAST = 2, TSDB_QUERY_TYPE_LAST = 2,
...@@ -117,6 +120,9 @@ typedef struct STsdbQueryHandle { ...@@ -117,6 +120,9 @@ typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb; STsdbRepo* pTsdb;
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
int16_t order; int16_t order;
int64_t offset; // limit offset
int64_t srows; // skip offset rows
int64_t frows; // forbid skip offset rows
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
int32_t numOfBlocks; int32_t numOfBlocks;
...@@ -155,6 +161,11 @@ typedef struct STableGroupSupporter { ...@@ -155,6 +161,11 @@ typedef struct STableGroupSupporter {
STSchema* pTagSchema; STSchema* pTagSchema;
} STableGroupSupporter; } STableGroupSupporter;
typedef struct SRange {
int32_t from;
int32_t to;
} SRange;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
...@@ -413,6 +424,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC ...@@ -413,6 +424,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = INT32_MIN; pQueryHandle->cur.fid = INT32_MIN;
...@@ -529,6 +543,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) { ...@@ -529,6 +543,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
...@@ -1073,63 +1090,302 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s ...@@ -1073,63 +1090,302 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
return midSlot; return midSlot;
} }
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int32_t* numOfBlocks) { // array :1 2 3 5 7 -2 (8 9) skip 4 and 6
int32_t code = 0; int32_t memMoveByArray(SBlock *blocks, SArray *pArray) {
// pArray is NULL or size is zero , no need block to move
if(pArray == NULL)
return 0;
size_t count = taosArrayGetSize(pArray);
if(count == 0)
return 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, index); // memmove
pCheckInfo->numOfBlocks = 0; int32_t num = 0;
SRange* ranges = (SRange*)TARRAY_GET_START(pArray);
for(size_t i = 0; i < count; i++) {
int32_t step = ranges[i].to - ranges[i].from + 1;
memmove(blocks + num, blocks + ranges[i].from, sizeof(SBlock) * step);
num += step;
}
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) { return num;
code = terrno; }
return code;
// if block data in memory return false else true
bool blockNoItemInMem(STsdbQueryHandle* q, SBlock* pBlock) {
if(q->pMemRef == NULL) {
return false;
} }
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx; // mem
if(q->pMemRef->snapshot.mem) {
SMemTable* mem = q->pMemRef->snapshot.mem;
if(timeIntersect(mem->keyFirst, mem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
}
// imem
if(q->pMemRef->snapshot.imem) {
SMemTable* imem = q->pMemRef->snapshot.imem;
if(timeIntersect(imem->keyFirst, imem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
}
// no data block in this file, try next file return true;
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) { }
return 0; // no data blocks in the file belongs to pCheckInfo->pTable
#define MAYBE_IN_MEMORY_ROWS 4000 // approximately the capacity of one block
// skip blocks . return value is skip blocks number, skip rows reduce from *pOffset
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, int64_t skey, int64_t ekey,
int32_t sblock, int32_t eblock, SArray** ppArray, bool order) {
int32_t num = 0;
SBlock* blocks = pBlockInfo->blocks;
SArray* pArray = NULL;
SRange range;
range.from = -1;
//
// ASC
//
if(order) {
for(int32_t i = sblock; i < eblock; i++) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == sblock && skey > pBlock->keyFirst) {
q->frows += pBlock->numOfRows; // some rows time < s
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = eblock - 1;
taosArrayPush(pArray, &range);
range.from = -1;
break;
}
} }
assert(compIndex->len > 0); if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = i;
}
}
// end append
if(range.from != -1) {
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
}
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo), // ASC return
(uint32_t*)(&pCheckInfo->compSize)) < 0) { *ppArray = pArray;
return terrno; return num;
} }
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; // DES
for(int32_t i = eblock - 1; i >= sblock; i--) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == eblock - 1 && ekey < pBlock->keyLast) {
q->frows += pBlock->numOfRows; // some rows time > e
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to - 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = 0;
taosArrayPush(pArray, &range);
range.from = -1;
break;
}
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = i;
}
}
// end append
if(range.from != -1) {
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
}
if(pArray == NULL)
return num;
// reverse array
size_t count = taosArrayGetSize(pArray);
SRange* ranges = TARRAY_GET_START(pArray);
SArray* pArray1 = taosArrayInit(count, sizeof(SRange));
size_t i = count - 1;
while(i >= 0) {
range.from = ranges[i].to;
range.to = ranges[i].from;
taosArrayPush(pArray1, &range);
if(i == 0)
break;
i --;
}
*ppArray = pArray1;
taosArrayDestroy(&pArray);
return num;
}
// shrink blocks by condition of query
static void shrinkBlocksByQuery(STsdbQueryHandle *pQueryHandle, STableCheckInfo *pCheckInfo) {
SBlockInfo *pCompInfo = pCheckInfo->pCompInfo;
SBlockIdx *compIndex = pQueryHandle->rhelper.pBlkIdx;
bool order = ASCENDING_TRAVERSE(pQueryHandle->order);
if (order) {
assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey); assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else { } else {
assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey); assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey);
} }
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window // discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) { if (s > pCompInfo->blocks[start].keyLast) {
return 0; return ;
} }
// todo speedup the procedure of located end block int32_t end = start;
// locate e index of blocks -> end
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1; end += 1;
} }
pCheckInfo->numOfBlocks = (end - start); // calc offset can skip blocks number
int32_t nSkip = 0;
SArray *pArray = NULL;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, s, e, start, end, &pArray, order);
}
if (start > 0) { if(nSkip > 0) { // have offset and can skip
pCheckInfo->numOfBlocks = memMoveByArray(pCompInfo->blocks, pArray);
} else { // no offset
pCheckInfo->numOfBlocks = end - start;
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
} }
if(pArray)
taosArrayDestroy(&pArray);
}
// load one table (tsd_index point to) need load blocks info and put into pCheckInfo->pCompInfo->blocks
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) {
//
// ONE PART. Load all blocks info from one table of tsd_index
//
int32_t code = 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index);
pCheckInfo->numOfBlocks = 0;
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
code = terrno;
return code;
}
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
// no data block in this file, try next file
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) {
return 0; // no data blocks in the file belongs to pCheckInfo->pTable
}
if (pCheckInfo->compSize < (int32_t)compIndex->len) {
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
return code;
}
pCheckInfo->pCompInfo = (SBlockInfo*)t;
pCheckInfo->compSize = compIndex->len;
}
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo),
(uint32_t*)(&pCheckInfo->compSize)) < 0) {
return terrno;
}
//
// TWO PART. shrink no need blocks from all blocks by condition of query
//
shrinkBlocksByQuery(pQueryHandle, pCheckInfo);
(*numOfBlocks) += pCheckInfo->numOfBlocks; (*numOfBlocks) += pCheckInfo->numOfBlocks;
return 0; return 0;
} }
...@@ -4312,4 +4568,11 @@ end: ...@@ -4312,4 +4568,11 @@ end:
return string; return string;
} }
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle) {
return pQueryHandle->srows;
}
return 0;
}
...@@ -58,6 +58,13 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar ...@@ -58,6 +58,13 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar
memcpy(target, context.digest, TSDB_KEY_LEN); memcpy(target, context.digest, TSDB_KEY_LEN);
} }
//
// TSKEY util
//
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -549,3 +549,16 @@ FORCE_INLINE double taos_align_get_double(const char* pBuf) { ...@@ -549,3 +549,16 @@ FORCE_INLINE double taos_align_get_double(const char* pBuf) {
memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem
return dv; return dv;
} }
//
// TSKEY util
//
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2) {
// s1,e1 and s2,e2 have 7 scenarios, 5 is intersection, 2 is no intersection, so we pick up 2.
if(e2 < s1 || s2 > e1)
return false;
else
return true;
}
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, db_test.stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
import json
class TDTestCase:
def caseDescription(self):
'''
case1: [TD-12435] fix ` identifier in table column name if using create table as subquery
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
print("============== STEP 1 ===== prepare data & validate json string")
tdSql.execute("create table if not exists st(ts timestamp, dataInt int)")
tdSql.execute("create table st_from_sub as select avg(`dataInt`) from st interval(1m)")
tdSql.query("describe st_from_sub")
tdSql.checkData(1, 0, 'avg__dataInt__')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def caseDescription(self):
'''
case1<shenglian zhou>: [TS-2016]fix select * from (select * from empty_stable)
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists td12229")
tdSql.execute("create database if not exists td12229")
tdSql.execute('use td12229')
tdSql.execute('create stable st(ts timestamp , value int ) tags (ind int)')
tdSql.execute('insert into tb1 using st tags(1) values(now ,1)')
tdSql.execute('insert into tb1 using st tags(1) values(now+1s ,2)')
tdSql.execute('insert into tb1 using st tags(1) values(now+2s ,3)')
tdSql.execute('create stable ste(ts timestamp , value int ) tags (ind int)')
tdSql.query('select * from st')
tdSql.checkRows(3)
tdSql.query('select * from (select * from ste)')
tdSql.checkRows(0)
tdSql.query('select * from st union all select * from ste')
tdSql.checkRows(3)
tdSql.query('select * from ste union all select * from st')
tdSql.checkRows(3)
tdSql.query('select elapsed(ts) from ste group by tbname union all select elapsed(ts) from st group by tbname;')
tdSql.checkRows(1)
tdSql.query('select elapsed(ts) from st group by tbname union all select elapsed(ts) from ste group by tbname;')
tdSql.checkRows(1)
tdSql.execute('drop database td12229')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
python3 ./test.py -f 0-others/json_tag.py python3 ./test.py -f 0-others/json_tag.py
python3 ./test.py -f 0-others/TD-12435.py
\ No newline at end of file
python3 ./test.py -f 2-query/ts_hidden_column.py python3 ./test.py -f 2-query/ts_hidden_column.py
python3 ./test.py -f 2-query/union-order.py python3 ./test.py -f 2-query/union-order.py
python3 ./test.py -f 2-query/session_two_stage.py python3 ./test.py -f 2-query/session_two_stage.py
python3 ./test.py -f 2-query/ts_2016.py
...@@ -143,7 +143,7 @@ python3 ./test.py -f stream/stream1.py ...@@ -143,7 +143,7 @@ python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py python3 ./test.py -f stream/stream2.py
#python3 ./test.py -f stream/parser.py #python3 ./test.py -f stream/parser.py
python3 ./test.py -f stream/history.py python3 ./test.py -f stream/history.py
python3 ./test.py -f stream/sys.py #python3 ./test.py -f stream/sys.py
python3 ./test.py -f stream/table_1.py python3 ./test.py -f stream/table_1.py
python3 ./test.py -f stream/table_n.py python3 ./test.py -f stream/table_n.py
python3 ./test.py -f stream/showStreamExecTimeisNull.py python3 ./test.py -f stream/showStreamExecTimeisNull.py
......
...@@ -229,7 +229,8 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertAllType.py ...@@ -229,7 +229,8 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertAllType.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertShell.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertShell.py
#query #query
python3 test.py -f query/distinctOneColTb.py python3 ./test.py -f query/queryBase.py
python3 ./test.py -f query/distinctOneColTb.py
python3 ./test.py -f query/filter.py python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterCombo.py python3 ./test.py -f query/filterCombo.py
python3 ./test.py -f query/queryNormal.py python3 ./test.py -f query/queryNormal.py
...@@ -286,6 +287,7 @@ python3 ./test.py -f query/queryCnameDisplay.py ...@@ -286,6 +287,7 @@ python3 ./test.py -f query/queryCnameDisplay.py
python3 test.py -f query/nestedQuery/queryWithSpread.py python3 test.py -f query/nestedQuery/queryWithSpread.py
python3 ./test.py -f query/bug6586.py python3 ./test.py -f query/bug6586.py
# python3 ./test.py -f query/bug5903.py # python3 ./test.py -f query/bug5903.py
python3 ./test.py -f query/queryLimit.py
#stream #stream
python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/metric_1.py
......
...@@ -140,6 +140,9 @@ class TDTestCase: ...@@ -140,6 +140,9 @@ class TDTestCase:
tdSql.error("select derivative(col, 1s, 1) from tb2") tdSql.error("select derivative(col, 1s, 1) from tb2")
tdSql.error("select derivative(col, 10s, 0) from tb2") tdSql.error("select derivative(col, 10s, 0) from tb2")
tdSql.error("select derivative(col, 999ms, 0) from tb2") tdSql.error("select derivative(col, 999ms, 0) from tb2")
tdSql.error("select derivative(col, now, 0) from tb2") #TD-11983 now not allowed in second param
tdSql.error("select derivative(col, now+3d-8h+6m, 0) from tb2") #TD-11983 now not allowed in second param
tdSql.error("select derivative(col, 3d-8h+now+6m, 0) from tb2") #TD-11983 now not allowed in second param
tdSql.error("select derivative(col, 10s, 1) from stb") tdSql.error("select derivative(col, 10s, 1) from stb")
tdSql.error("select derivative(col, 10s, 1) from stb group by col") tdSql.error("select derivative(col, 10s, 1) from stb group by col")
...@@ -150,6 +153,9 @@ class TDTestCase: ...@@ -150,6 +153,9 @@ class TDTestCase:
tdSql.error("select derivative(col, 10y, 0) from stb group by tbname") #TD-10399, DB error: syntax error near '10y, 0) from stb group by tbname;' tdSql.error("select derivative(col, 10y, 0) from stb group by tbname") #TD-10399, DB error: syntax error near '10y, 0) from stb group by tbname;'
tdSql.error("select derivative(col, -106752d, 0) from stb group by tbname") #TD-10398 overflow tips tdSql.error("select derivative(col, -106752d, 0) from stb group by tbname") #TD-10398 overflow tips
tdSql.error("select derivative(col, 106751991168d, 0) from stb group by tbname") #TD-10398 overflow tips tdSql.error("select derivative(col, 106751991168d, 0) from stb group by tbname") #TD-10398 overflow tips
tdSql.error("select derivative(col, now, 1) from stb") #TD-11983 now not allowed in second param
tdSql.error("select derivative(col, now+3d-8h+6m, 1) from stb") #TD-11983 now not allowed in second param
tdSql.error("select derivative(col, 3d-8h+now+6m, 1) from stb") #TD-11983 now not allowed in second param
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
......
...@@ -345,7 +345,9 @@ class ElapsedCase: ...@@ -345,7 +345,9 @@ class ElapsedCase:
tdSql.error("select elapsed(*) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") tdSql.error("select elapsed(*) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
tdSql.error("select elapsed(ts, '1s') from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") tdSql.error("select elapsed(ts, '1s') from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
tdSql.error("select elapsed(ts, i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") tdSql.error("select elapsed(ts, i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
#tdSql.error("select elapsed(ts, now) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") tdSql.error("select elapsed(ts, now) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
tdSql.error("select elapsed(ts, now-7d+2h-3m+2s) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
tdSql.error("select elapsed(ts, 7d+2h+now+3m+2s) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
tdSql.error("select elapsed(ts, ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") tdSql.error("select elapsed(ts, ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
tdSql.error("select elapsed(ts + 1) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") tdSql.error("select elapsed(ts + 1) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
tdSql.error("select elapsed(ts, 1b) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") tdSql.error("select elapsed(ts, 1b) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'")
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# query base function test case
#
import sys
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
def caseDescription(self):
'''
Query moudle base api or keyword test case:
case1: api first() last()
case2: none
'''
return
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.prepare()
self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# insert data
self.insert_data("t1", self.ts, 1*10000, 30000, 0);
self.insert_data("t2", self.ts, 2*10000, 30000, 100000);
self.insert_data("t3", self.ts, 3*10000, 30000, 200000);
# test base case
self.case_first()
tdLog.debug(" QUERYBASE first() api ............ [OK]")
# test advance case
self.case_last()
tdLog.debug(" QUERYBASE last() api ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
#
# --------------- case -------------------
#
# create table
def create_tables(self):
# super table
tdSql.execute("create table st(ts timestamp, i1 int) tags(area int)");
# child table
tdSql.execute("create table t1 using st tags(1)");
tdSql.execute("create table t2 using st tags(2)");
tdSql.execute("create table t3 using st tags(3)");
return
# insert data1
def insert_data(self, tbname, ts_start, count, batch_num, base):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d)"%(ts_start + i*1000, base + i)
if i >0 and i%batch_num == 0:
tdSql.execute(sql)
sql = pre_insert
# end sql
if sql != pre_insert:
tdSql.execute(sql)
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# first case base
def case_first(self):
#
# last base function
#
# base t1 table
sql = "select first(*) from t1 where ts>='2017-07-14 12:40:00' order by ts asc;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
sql = "select first(*) from t1 where ts>='2017-07-14 12:40:00' order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
# super table st
sql = "select first(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 3600)
sql = "select first(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 3600)
# sub query
sql = "select first(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts asc );"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 187019100)
sql = "select first(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts desc );" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 187019100)
return
# last case
def case_last(self):
#
# last base test
#
# base t1 table
sql = "select last(*) from t1 where ts<='2017-07-14 12:40:00' order by ts asc;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
sql = "select last(*) from t1 where ts<='2017-07-14 12:40:00' order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
# super table st
sql = "select last(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
sql = "select last(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
# sub query
sql = "select last(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts asc );"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 192419100)
sql = "select last(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts desc );" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 192419100)
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
def caseDescription(self):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: limit offset advance test
'''
return
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.prepare()
self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# insert data
self.insert_data("t1", self.ts, 300*10000, 30000);
# test base case
self.test_case1()
tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case
self.test_case2()
tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
#
# --------------- case -------------------
#
# create table
def create_tables(self):
# super table
tdSql.execute("create table st(ts timestamp, i1 int) tags(area int)");
# child table
tdSql.execute("create table t1 using st tags(1)");
tdSql.execute("create table t2 using st tags(2)");
tdSql.execute("create table t3 using st tags(3)");
return
# insert data1
def insert_data(self, tbname, ts_start, count, batch_num):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d)"%(ts_start + i*1000, i)
if i >0 and i%batch_num == 0:
tdSql.execute(sql)
sql = pre_insert
# end sql
if sql != pre_insert:
tdSql.execute(sql)
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# test case1 base
def test_case1(self):
#
# limit base function
#
# base no where
sql = "select * from t1 limit 10"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 0)
tdSql.checkData(9, 1, 9)
sql = "select * from t1 order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999999)
tdSql.checkData(9, 1, 2999990)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:01' and ts<'2017-07-14 10:40:06' limit 10"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 1)
tdSql.checkData(4, 1, 5)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 2999996)
tdSql.checkData(4, 1, 2999992)
#
# offset base function
#
# no where
sql = "select * from t1 limit 10 offset 5"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 5)
tdSql.checkData(9, 1, 14)
sql = "select * from t1 order by ts desc limit 10 offset 5" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999994)
tdSql.checkData(9, 1, 2999985)
# have where only ts
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:20' limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 15)
tdSql.checkData(4, 1, 19)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10 offset 4" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 2999992)
# have where with other column condition
sql = "select * from t1 where i1>=1 and i1<11 limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 6)
tdSql.checkData(4, 1, 10)
sql = "select * from t1 where i1>=300000 and i1<=500000 order by ts desc limit 10 offset 100000" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 400000)
tdSql.checkData(9, 1, 399991)
# have where with ts and other column condition
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:50' and i1>=20 and i1<=25 limit 10 offset 5"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 25)
return
# test advance
def test_case2(self):
#
# OFFSET merge file data with memory data
#
# offset
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000)
# each insert one row into NO.0 NO.2 NO.7 blocks
sql = "insert into t1 values (%d, 0) (%d, 2) (%d, 7)"%(self.ts+1, self.ts + 2*3300*1000+1, self.ts + 7*3300*1000+1)
tdSql.execute(sql)
# query result
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3 + 10 + 1)
# have where desc
sql = "select * from t1 where ts<'2017-07-14 20:40:00' order by ts desc limit 15 offset 36000"
tdSql.waitedQuery(sql, 3, WAITS)
tdSql.checkData(0, 1, 1)
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -26,6 +26,8 @@ class TDTestCase: ...@@ -26,6 +26,8 @@ class TDTestCase:
''' '''
tdCom.cleanTb() tdCom.cleanTb()
table_name = tdCom.getLongName(8, "letters_mixed") table_name = tdCom.getLongName(8, "letters_mixed")
while table_name.islower():
table_name = tdCom.getLongName(8, "letters_mixed")
table_name_sub = f'{table_name}_sub' table_name_sub = f'{table_name}_sub'
tb_name_lower = table_name_sub.lower() tb_name_lower = table_name_sub.lower()
tb_name_upper = table_name_sub.upper() tb_name_upper = table_name_sub.upper()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册