未验证 提交 ea5533ef 编写于 作者: M Minglei Jin 提交者: GitHub

Merge branch 'master' into fix/TS-927

...@@ -67,6 +67,7 @@ def pre_test(){ ...@@ -67,6 +67,7 @@ def pre_test(){
} }
sh''' sh'''
cd ${WKC} cd ${WKC}
git remote prune origin
git pull >/dev/null git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
...@@ -140,6 +141,7 @@ def pre_test_noinstall(){ ...@@ -140,6 +141,7 @@ def pre_test_noinstall(){
} }
sh''' sh'''
cd ${WKC} cd ${WKC}
git remote prune origin
git pull >/dev/null git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
...@@ -210,6 +212,7 @@ def pre_test_ningsi(){ ...@@ -210,6 +212,7 @@ def pre_test_ningsi(){
} }
sh''' sh'''
cd ${WKC} cd ${WKC}
git remote prune origin
git pull >/dev/null git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
...@@ -284,6 +287,7 @@ def pre_test_win(){ ...@@ -284,6 +287,7 @@ def pre_test_win(){
} }
bat''' bat'''
cd C:\\workspace\\TDinternal\\community cd C:\\workspace\\TDinternal\\community
git remote prune origin
git pull git pull
git fetch origin +refs/pull/%CHANGE_ID%/merge git fetch origin +refs/pull/%CHANGE_ID%/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
......
...@@ -29,6 +29,14 @@ extern "C" { ...@@ -29,6 +29,14 @@ extern "C" {
#include "tsched.h" #include "tsched.h"
#include "tsclient.h" #include "tsclient.h"
#define LABEL_SQL "sql:"
#define LABEL_TO " to:"
#define LABEL_SPLIT " split:"
#define LABEL_SQL_LEN (sizeof(LABEL_SQL) - 1)
#define LABEL_TO_LEN (sizeof(LABEL_TO) - 1)
#define LABEL_SPLIT_LEN (sizeof(LABEL_SPLIT) - 1)
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ #define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
......
...@@ -406,6 +406,10 @@ typedef struct SSqlStream { ...@@ -406,6 +406,10 @@ typedef struct SSqlStream {
int16_t precision; int16_t precision;
int64_t num; // number of computing count int64_t num; // number of computing count
int32_t dstCols; // dstTable has number of columns
char* to;
char* split;
/* /*
* keep the number of current result in computing, * keep the number of current result in computing,
* the value will be set to 0 before set timer for next computing * the value will be set to 0 before set timer for next computing
...@@ -482,6 +486,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, ...@@ -482,6 +486,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, TAOS **taos); void *param, TAOS **taos);
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res); TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res);
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param); TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param);
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code); void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
......
...@@ -50,3 +50,4 @@ taos_stmt_bind_param_batch ...@@ -50,3 +50,4 @@ taos_stmt_bind_param_batch
taos_stmt_bind_single_param_batch taos_stmt_bind_single_param_batch
taos_is_null taos_is_null
taos_insert_lines taos_insert_lines
taos_print_row_ex
\ No newline at end of file
...@@ -951,9 +951,16 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -951,9 +951,16 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return code; return code;
} }
// set the command/global limit parameters from the first subclause to the sqlcmd object // set the command/global limit parameters from the first not empty subclause to the sqlcmd object
pCmd->active = pCmd->pQueryInfo; SQueryInfo* queryInfo = pCmd->pQueryInfo;
pCmd->command = pCmd->pQueryInfo->command; int16_t command = queryInfo->command;
while (command == TSDB_SQL_RETRIEVE_EMPTY_RESULT && queryInfo->sibling != NULL) {
queryInfo = queryInfo->sibling;
command = queryInfo->command;
}
pCmd->active = queryInfo;
pCmd->command = command;
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pCmd->active, 0); STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pCmd->active, 0);
if (pTableMetaInfo1->pTableMeta != NULL) { if (pTableMetaInfo1->pTableMeta != NULL) {
...@@ -1350,6 +1357,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql) ...@@ -1350,6 +1357,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql)
const char* msg3 = "no acctId"; const char* msg3 = "no acctId";
const char* msg4 = "db name too long"; const char* msg4 = "db name too long";
const char* msg5 = "table name too long"; const char* msg5 = "table name too long";
const char* msg6 = "table name empty";
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -1396,6 +1404,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql) ...@@ -1396,6 +1404,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql)
if (pTableName->n >= TSDB_TABLE_NAME_LEN) { if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if(pTableName->n == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
...@@ -7803,6 +7813,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -7803,6 +7813,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
if (tscValidateName(pName) != TSDB_CODE_SUCCESS) { if (tscValidateName(pName) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
// check to valid and create to name
if(pInfo->pCreateTableInfo->to.n > 0) {
if (tscValidateName(&pInfo->pCreateTableInfo->to) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
int32_t code = tscSetTableFullName(&pInfo->pCreateTableInfo->toSName, &pInfo->pCreateTableInfo->to, pSql);
if(code != TSDB_CODE_SUCCESS) {
return code;
}
}
SRelationInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from; SRelationInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from;
if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->list) == 0) { if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->list) == 0) {
......
...@@ -1630,11 +1630,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1630,11 +1630,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pCreateTableInfo->pSelect != NULL) { if (pCreateTableInfo->pSelect != NULL) {
size += (pCreateTableInfo->pSelect->sqlstr.n + 1); size += (pCreateTableInfo->pSelect->sqlstr.n + 1);
// add size = create super table with same columns and 1 tags
if(pCreateTableInfo->to.n > 0) {
size += sizeof(SCreateTableMsg);
size += sizeof(SSchema) * (pCmd->numOfCols + 1);
size += pCreateTableInfo->to.n + 4; // to:
if(pCreateTableInfo->split.n > 0)
size += pCreateTableInfo->split.n + 7; // split:
}
} }
return size + TSDB_EXTRA_PAYLOAD_SIZE; return size + TSDB_EXTRA_PAYLOAD_SIZE;
} }
char* fillCreateSTableMsg(SCreateTableMsg* pCreateMsg, SCreateTableSql* pTableSql, SSqlCmd* pCmd, SSqlInfo *pInfo) {
// SET
SSchema* pSchema = (SSchema *)pCreateMsg->schema;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pCreateMsg->igExists = 0;
pCreateMsg->sqlLen = 0;
// FullName like acctID.dbName.tableName
tNameExtractFullName(&pInfo->pCreateTableInfo->toSName, pCreateMsg->tableName);
// copy columns
pCreateMsg->numOfColumns = htons(pCmd->numOfCols);
for (int i = 0; i < pCmd->numOfCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type;
strcpy(pSchema->name, pField->name);
pSchema->bytes = htons(pField->bytes);
pSchema++;
}
// append one tag
pCreateMsg->numOfTags = htons(1); // only one tag immutable
pSchema->type = TSDB_DATA_TYPE_INT;
pSchema->bytes = htons(INT_BYTES);
strcpy(pSchema->name, "tag1");
pSchema ++;
return (char *)pSchema;
}
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int msgLen = 0; int msgLen = 0;
int size = 0; int size = 0;
...@@ -1682,39 +1720,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1682,39 +1720,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCreate->len = htonl(len); pCreate->len = htonl(len);
} }
} else { // create (super) table } else { // create (super) table
// FIRST MSG
SCreateTableMsg* pCreate = pCreateMsg;
pCreateTableMsg->numOfTables = htonl(1); // only one table will be created pCreateTableMsg->numOfTables = htonl(1); // only one table will be created
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreate->tableName);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreateMsg->tableName); bool to = pInfo->pCreateTableInfo->to.n > 0;
assert(code == 0); assert(code == 0);
SCreateTableSql *pCreateTable = pInfo->pCreateTableInfo; SCreateTableSql *pCreateTable = pInfo->pCreateTableInfo;
pCreateMsg->igExists = pCreateTable->existCheck ? 1 : 0; pCreate->igExists = pCreateTable->existCheck ? 1 : 0;
pCreateMsg->numOfColumns = htons(pCmd->numOfCols); pCreate->numOfColumns = htons(pCmd->numOfCols);
pCreateMsg->numOfTags = htons(pCmd->count); pCreate->numOfTags = htons(pCmd->count);
pCreate->sqlLen = 0;
pCreateMsg->sqlLen = 0; pSchema = (SSchema *)pCreate->schema;
pMsg = (char *)pCreateMsg->schema; //copy schema
pSchema = (SSchema *)pCreateMsg->schema;
for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) { for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type; pSchema->type = pField->type;
strcpy(pSchema->name, pField->name); strcpy(pSchema->name, pField->name);
pSchema->bytes = htons(pField->bytes); pSchema->bytes = htons(pField->bytes);
pSchema++; pSchema++;
} }
//copy stream sql if have
pMsg = (char *)pSchema; pMsg = (char *)pSchema;
if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql
SSqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect; SSqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect;
int16_t len = 0;
if(to) {
//sql:
strcpy(pMsg, LABEL_SQL);
len += LABEL_SQL_LEN;
len += tStrNCpy(pMsg + len, &pQuerySql->sqlstr);
//to
strcpy(pMsg + len, LABEL_TO);
len += LABEL_TO_LEN;
len += tStrNCpy(pMsg + len, &pInfo->pCreateTableInfo->to);
//split if
if(pInfo->pCreateTableInfo->split.n > 0) {
strcpy(pMsg + len, LABEL_SPLIT);
len += LABEL_SPLIT_LEN;
len += tStrNCpy(pMsg + len, &pInfo->pCreateTableInfo->split);
}
// append string end flag
pMsg[len++] = 0;
pMsg += len;
pCreate->sqlLen = htons(len);
} else {
len = pQuerySql->sqlstr.n;
strncpy(pMsg, pQuerySql->sqlstr.z, len);
pMsg[len++] = 0; // string end
pMsg += len;
pCreate->sqlLen = htons(len);
}
}
// calc first msg length
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
strncpy(pMsg, pQuerySql->sqlstr.z, pQuerySql->sqlstr.n + 1); // filling second msg if to have value
pCreateMsg->sqlLen = htons(pQuerySql->sqlstr.n + 1); if(to) {
pMsg += pQuerySql->sqlstr.n + 1; pCreate = (SCreateTableMsg *)pMsg;
pMsg = fillCreateSTableMsg(pCreate, pCreateTable, pCmd, pInfo);
len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
pCreateTableMsg->numOfTables = htonl(2);
} }
} }
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "tutil.h" #include "tutil.h"
#include "ttimer.h" #include "ttimer.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "tidpool.h"
static bool validImpl(const char* str, size_t maxsize) { static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -306,6 +307,25 @@ void taos_close(TAOS *taos) { ...@@ -306,6 +307,25 @@ void taos_close(TAOS *taos) {
taosRemoveRef(tscRefId, pObj->rid); taosRemoveRef(tscRefId, pObj->rid);
} }
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos) {
// param valid check
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("pObj:%p is NULL or freed", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
if(pObj->pRpcObj == NULL ) {
tscError("pObj:%p pRpcObj is NULL.", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
// get number
return rpcUnusedSession(pObj->pRpcObj->pDnodeConn, false);
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(tres != NULL); assert(tres != NULL);
...@@ -772,11 +792,15 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { ...@@ -772,11 +792,15 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
} }
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
return taos_print_row_ex(str, row, fields, num_fields, ' ', false);
}
int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota) {
int len = 0; int len = 0;
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (i > 0) { if (i > 0) {
str[len++] = ' '; str[len++] = split;
} }
if (row[i] == NULL) { if (row[i] == NULL) {
...@@ -837,9 +861,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) ...@@ -837,9 +861,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
} else { } else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0); assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
} }
// add pre quotaion if require
if(addQuota) {
*(str + len) = '\'';
len += 1;
}
// copy content
memcpy(str + len, row[i], charLen); memcpy(str + len, row[i], charLen);
len += charLen; len += charLen;
// add end quotaion if require
if(addQuota) {
*(str + len)= '\'';
len += 1;
}
} break; } break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
...@@ -856,6 +894,89 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) ...@@ -856,6 +894,89 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
return len; return len;
} }
// print field value to str
int taos_print_field(char *str, void* value, TAOS_FIELD *field) {
// check valid
if (str == NULL || value == NULL || field == NULL) {
return 0;
}
// get value
int len = 0;
switch (field->type) {
//
// fixed length
//
case TSDB_DATA_TYPE_TINYINT:
len = sprintf(str, "%d", *((int8_t *)value));
break;
case TSDB_DATA_TYPE_UTINYINT:
len = sprintf(str, "%u", *((uint8_t *)value));
break;
case TSDB_DATA_TYPE_SMALLINT:
len = sprintf(str, "%d", *((int16_t *)value));
break;
case TSDB_DATA_TYPE_USMALLINT:
len = sprintf(str, "%u", *((uint16_t *)value));
break;
case TSDB_DATA_TYPE_INT:
len = sprintf(str, "%d", *((int32_t *)value));
break;
case TSDB_DATA_TYPE_UINT:
len = sprintf(str, "%u", *((uint32_t *)value));
break;
case TSDB_DATA_TYPE_BIGINT:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_UBIGINT:
len = sprintf(str, "%" PRIu64, *((uint64_t *)value));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(value);
len = sprintf(str, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(value);
len = sprintf(str, "%lf", dv);
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_BOOL:
len = sprintf(str, "%d", *((int8_t *)value));
//
// variant length
//
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
len = varDataLen((char*)value - VARSTR_HEADER_SIZE);
if (field->type == TSDB_DATA_TYPE_BINARY) {
assert(len <= field->bytes && len >= 0);
} else {
assert(len <= field->bytes * TSDB_NCHAR_SIZE && len >= 0);
}
memcpy(str, value, len);
} break;
default:
break;
}
return len;
}
static void asyncCallback(void *param, TAOS_RES *tres, int code) { static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param); SSqlObj *pSql = ((SSqlObj *)param);
......
此差异已折叠。
...@@ -1340,6 +1340,18 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1340,6 +1340,18 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
break; break;
} }
} }
// set input data order to param[1]
if(pex->base.functionId == TSDB_FUNC_FIRST || pex->base.functionId == TSDB_FUNC_FIRST_DST ||
pex->base.functionId == TSDB_FUNC_LAST || pex->base.functionId == TSDB_FUNC_LAST_DST) {
// set input order
SQueryInfo* pInputQI = pSqlObjList[0]->cmd.pQueryInfo;
if(pInputQI) {
pex->base.numOfParams = 3;
pex->base.param[2].nType = TSDB_DATA_TYPE_INT;
pex->base.param[2].i64 = pInputQI->order.order;
}
}
} }
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self); tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self);
...@@ -3998,6 +4010,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3998,6 +4010,11 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
// create sub query to handle the sub query. // create sub query to handle the sub query.
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
STableMetaInfo* pSubMeta = tscGetMetaInfo(pq, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pSubMeta) &&
pq->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
psub->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
}
executeQuery(psub, pq); executeQuery(psub, pq);
} }
......
...@@ -78,7 +78,7 @@ extern int32_t tsMaxNumOfOrderedResults; ...@@ -78,7 +78,7 @@ extern int32_t tsMaxNumOfOrderedResults;
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime; extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay; extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay; extern int32_t tsFirstLaunchDelay;
extern int32_t tsRetryStreamCompDelay; extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval; extern int32_t tsProjectExecInterval;
......
...@@ -98,11 +98,11 @@ int32_t tsMinIntervalTime = 1; ...@@ -98,11 +98,11 @@ int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly // 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000; int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly // 10sec, the stream first launched to execute delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000; int32_t tsFirstLaunchDelay = 10000;
// the stream computing delay time after executing failed, change accordingly // the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000; int32_t tsRetryStreamCompDelay = 30 * 60 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default. // The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f; float tsStreamComputDelayRatio = 0.1f;
...@@ -755,7 +755,7 @@ static void doInitGlobalConfig(void) { ...@@ -755,7 +755,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "maxFirstStreamCompDelay"; cfg.option = "maxFirstStreamCompDelay";
cfg.ptr = &tsStreamCompStartDelay; cfg.ptr = &tsFirstLaunchDelay; // stream first launch delay time
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1000; cfg.minValue = 1000;
......
...@@ -423,8 +423,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { ...@@ -423,8 +423,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
} }
// inner implement in tscStream.c // inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, int32_t dstCols, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t tsc_stime, void *param, void (*callback)(void *), void* cqhandle); int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext; pObj->pContext = pContext;
...@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
} }
pObj->tmrId = 0; pObj->tmrId = 0;
int32_t dstCols = -1;
if(pObj->pSchema)
dstCols = pObj->pSchema->numOfCols;
if (pObj->pStream == NULL) { if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \ pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, dstCols, pObj->sqlStr, cqProcessStreamRes, \
INT64_MIN, (void *)pObj->rid, NULL, pContext); INT64_MIN, (void *)pObj->rid, NULL, pContext);
// TODO the pObj->pStream may be released if error happens // TODO the pObj->pStream may be released if error happens
......
...@@ -137,7 +137,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res); ...@@ -137,7 +137,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
// row to string
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota);
// one field to string
DLL_EXPORT int taos_print_field(char *str, void* value, TAOS_FIELD *field);
DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
...@@ -165,7 +169,7 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); ...@@ -165,7 +169,7 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)); int64_t tsc_stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
......
...@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); ...@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -173,6 +173,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details ...@@ -173,6 +173,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details
typedef struct STsdbQueryCond { typedef struct STsdbQueryCond {
STimeWindow twindow; STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo *colList;
bool loadExternalRows; // load external rows or not bool loadExternalRows; // load external rows or not
...@@ -392,6 +393,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -392,6 +393,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
/** /**
* get the statistics of repo usage * get the statistics of repo usage
* @param repo. point to the tsdbrepo * @param repo. point to the tsdbrepo
......
...@@ -133,86 +133,87 @@ ...@@ -133,86 +133,87 @@
#define TK_UNSIGNED 115 #define TK_UNSIGNED 115
#define TK_TAGS 116 #define TK_TAGS 116
#define TK_USING 117 #define TK_USING 117
#define TK_NULL 118 #define TK_TO 118
#define TK_NOW 119 #define TK_SPLIT 119
#define TK_VARIABLE 120 #define TK_NULL 120
#define TK_SELECT 121 #define TK_NOW 121
#define TK_UNION 122 #define TK_VARIABLE 122
#define TK_ALL 123 #define TK_SELECT 123
#define TK_DISTINCT 124 #define TK_UNION 124
#define TK_FROM 125 #define TK_ALL 125
#define TK_INTERVAL 126 #define TK_DISTINCT 126
#define TK_EVERY 127 #define TK_FROM 127
#define TK_SESSION 128 #define TK_INTERVAL 128
#define TK_STATE_WINDOW 129 #define TK_EVERY 129
#define TK_FILL 130 #define TK_SESSION 130
#define TK_SLIDING 131 #define TK_STATE_WINDOW 131
#define TK_ORDER 132 #define TK_FILL 132
#define TK_BY 133 #define TK_SLIDING 133
#define TK_ASC 134 #define TK_ORDER 134
#define TK_GROUP 135 #define TK_BY 135
#define TK_HAVING 136 #define TK_ASC 136
#define TK_LIMIT 137 #define TK_GROUP 137
#define TK_OFFSET 138 #define TK_HAVING 138
#define TK_SLIMIT 139 #define TK_LIMIT 139
#define TK_SOFFSET 140 #define TK_OFFSET 140
#define TK_WHERE 141 #define TK_SLIMIT 141
#define TK_RESET 142 #define TK_SOFFSET 142
#define TK_QUERY 143 #define TK_WHERE 143
#define TK_SYNCDB 144 #define TK_RESET 144
#define TK_ADD 145 #define TK_QUERY 145
#define TK_COLUMN 146 #define TK_SYNCDB 146
#define TK_MODIFY 147 #define TK_ADD 147
#define TK_TAG 148 #define TK_COLUMN 148
#define TK_CHANGE 149 #define TK_MODIFY 149
#define TK_SET 150 #define TK_TAG 150
#define TK_KILL 151 #define TK_CHANGE 151
#define TK_CONNECTION 152 #define TK_SET 152
#define TK_STREAM 153 #define TK_KILL 153
#define TK_COLON 154 #define TK_CONNECTION 154
#define TK_ABORT 155 #define TK_STREAM 155
#define TK_AFTER 156 #define TK_COLON 156
#define TK_ATTACH 157 #define TK_ABORT 157
#define TK_BEFORE 158 #define TK_AFTER 158
#define TK_BEGIN 159 #define TK_ATTACH 159
#define TK_CASCADE 160 #define TK_BEFORE 160
#define TK_CLUSTER 161 #define TK_BEGIN 161
#define TK_CONFLICT 162 #define TK_CASCADE 162
#define TK_COPY 163 #define TK_CLUSTER 163
#define TK_DEFERRED 164 #define TK_CONFLICT 164
#define TK_DELIMITERS 165 #define TK_COPY 165
#define TK_DETACH 166 #define TK_DEFERRED 166
#define TK_EACH 167 #define TK_DELIMITERS 167
#define TK_END 168 #define TK_DETACH 168
#define TK_EXPLAIN 169 #define TK_EACH 169
#define TK_FAIL 170 #define TK_END 170
#define TK_FOR 171 #define TK_EXPLAIN 171
#define TK_IGNORE 172 #define TK_FAIL 172
#define TK_IMMEDIATE 173 #define TK_FOR 173
#define TK_INITIALLY 174 #define TK_IGNORE 174
#define TK_INSTEAD 175 #define TK_IMMEDIATE 175
#define TK_MATCH 176 #define TK_INITIALLY 176
#define TK_KEY 177 #define TK_INSTEAD 177
#define TK_OF 178 #define TK_MATCH 178
#define TK_RAISE 179 #define TK_KEY 179
#define TK_REPLACE 180 #define TK_OF 180
#define TK_RESTRICT 181 #define TK_RAISE 181
#define TK_ROW 182 #define TK_REPLACE 182
#define TK_STATEMENT 183 #define TK_RESTRICT 183
#define TK_TRIGGER 184 #define TK_ROW 184
#define TK_VIEW 185 #define TK_STATEMENT 185
#define TK_SEMI 186 #define TK_TRIGGER 186
#define TK_NONE 187 #define TK_VIEW 187
#define TK_PREV 188 #define TK_SEMI 188
#define TK_LINEAR 189 #define TK_NONE 189
#define TK_IMPORT 190 #define TK_PREV 190
#define TK_TBNAME 191 #define TK_LINEAR 191
#define TK_JOIN 192 #define TK_IMPORT 192
#define TK_INSERT 193 #define TK_TBNAME 193
#define TK_INTO 194 #define TK_JOIN 194
#define TK_VALUES 195 #define TK_INSERT 195
#define TK_FILE 196 #define TK_INTO 196
#define TK_VALUES 197
#define TK_FILE 198
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
......
...@@ -131,7 +131,7 @@ static void *shellCheckThreadFp(void *arg) { ...@@ -131,7 +131,7 @@ static void *shellCheckThreadFp(void *arg) {
char *tbname = tbNames[t]; char *tbname = tbNames[t];
if (tbname == NULL) break; if (tbname == NULL) break;
snprintf(sql, SHELL_SQL_LEN, "select last_row(_c0) from %s;", tbname); snprintf(sql, SHELL_SQL_LEN, "select count(*) from %s;", tbname);
TAOS_RES *pSql = taos_query(pThread->taos, sql); TAOS_RES *pSql = taos_query(pThread->taos, sql);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
......
...@@ -43,7 +43,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_ ...@@ -43,7 +43,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_
int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg); int32_t mnodeCreateVgroup(struct SMnodeMsg *pMsg);
void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle); void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle);
void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle); void mnodeAlterVgroup(SVgObj *pVgroup, void *ahandle);
int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid); int32_t mnodeGetAvailableVgroup(struct SMnodeMsg *pMsg, SVgObj **pVgroup, int32_t *sid, int32_t vgId);
int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCheck); int32_t mnodeAddTableIntoVgroup(SVgObj *pVgroup, SCTableObj *pTable, bool needCheck);
void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable); void mnodeRemoveTableFromVgroup(SVgObj *pVgroup, SCTableObj *pTable);
......
...@@ -48,6 +48,12 @@ ...@@ -48,6 +48,12 @@
#define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14 #define CREATE_CTABLE_RETRY_SEC 14
// informal
#define META_SYNC_TABLE_NAME "_taos_meta_sync_table_name_taos_"
#define META_SYNC_TABLE_NAME_LEN 32
static int32_t tsMetaSyncOption = 0;
// informal
int64_t tsCTableRid = -1; int64_t tsCTableRid = -1;
static void * tsChildTableSdb; static void * tsChildTableSdb;
int64_t tsSTableRid = -1; int64_t tsSTableRid = -1;
...@@ -845,15 +851,15 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -845,15 +851,15 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len)); memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
code = mnodeValidateCreateTableMsg(p, pSubMsg); code = mnodeValidateCreateTableMsg(p, pSubMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) { if (code == TSDB_CODE_SUCCESS || ( p->igExists == 1 && code == TSDB_CODE_MND_TABLE_ALREADY_EXIST )) {
++pSubMsg->pBatchMasterMsg->successed; ++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg); mnodeDestroySubMsg(pSubMsg);
continue; continue;
} }
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pSubMsg); mnodeDestroySubMsg(pSubMsg);
return code; return code;
} }
} }
...@@ -1045,11 +1051,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1045,11 +1051,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid); mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid);
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->successed ++;
} else { } else {
mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
tstrerror(code)); tstrerror(code));
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb}; SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->received ++;
}
// if super table create by batch msg, check done and send finished to client
if(pMsg->pBatchMasterMsg) {
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected)
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
} }
return code; return code;
...@@ -1732,6 +1748,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1732,6 +1748,9 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
cols++; cols++;
numOfRows++; numOfRows++;
mDebug("stable: %s, uid: %" PRIu64, prefix, pTable->uid);
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
} }
...@@ -2233,9 +2252,19 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -2233,9 +2252,19 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t tid = 0; int32_t tid = 0;
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid); int32_t vgId = 0;
if (tsMetaSyncOption) {
char *pTbName = strchr(pCreate->tableName, '.');
if (pTbName && (pTbName = strchr(pTbName + 1, '.'))) {
if (0 == strncmp(META_SYNC_TABLE_NAME, ++pTbName, META_SYNC_TABLE_NAME_LEN)) {
vgId = atoi(pTbName + META_SYNC_TABLE_NAME_LEN);
}
}
}
code = mnodeGetAvailableVgroup(pMsg, &pVgroup, &tid, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mDebug("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p table:%s, failed to get available vgroup, reason:%s", pMsg, pMsg->rpcMsg.ahandle,
pCreate->tableName, tstrerror(code)); pCreate->tableName, tstrerror(code));
return code; return code;
} }
......
...@@ -428,10 +428,47 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) { ...@@ -428,10 +428,47 @@ static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid) { int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSid, int32_t vgId) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
pthread_mutex_lock(&pDb->mutex); pthread_mutex_lock(&pDb->mutex);
if (vgId > 0) {
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
SVgObj *pVgroup = pDb->vgList[v];
if (pVgroup == NULL) {
mError("db:%s, vgroup: %d is null", pDb->name, v);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_APP_ERROR;
}
if (pVgroup->vgId != (uint32_t)vgId) { // find the target vgId
continue;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) {
int curMaxId = taosIdPoolMaxSize(pVgroup->idPool);
if ((taosUpdateIdPool(pVgroup->idPool, curMaxId + 1) < 0) || ((sid = taosAllocateId(pVgroup->idPool)) <= 0)) {
mError("msg:%p, app:%p db:%s, no enough sid in vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pDb->name,
pVgroup->vgId);
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_APP_ERROR;
}
}
mDebug("vgId:%d, alloc tid:%d", pVgroup->vgId, sid);
*pSid = sid;
*ppVgroup = pVgroup;
pDb->vgListIndex = v;
pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_SUCCESS;
}
pthread_mutex_unlock(&pDb->mutex);
mError("db:%s, vgroup: %d not exist", pDb->name, vgId);
return TSDB_CODE_MND_APP_ERROR;
}
for (int32_t v = 0; v < pDb->numOfVgroups; ++v) { for (int32_t v = 0; v < pDb->numOfVgroups; ++v) {
int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups; int vgIndex = (v + pDb->vgListIndex) % pDb->numOfVgroups;
SVgObj *pVgroup = pDb->vgList[vgIndex]; SVgObj *pVgroup = pDb->vgList[vgIndex];
......
...@@ -230,6 +230,7 @@ typedef struct SQueryAttr { ...@@ -230,6 +230,7 @@ typedef struct SQueryAttr {
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock bool multigroupResult; // multigroup result can exist in one SSDataBlock
bool needSort; // need sort rowRes bool needSort; // need sort rowRes
bool skipOffset; // can skip offset if true
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
#include "tstrbuild.h" #include "tstrbuild.h"
#include "ttoken.h" #include "ttoken.h"
#include "tvariant.h" #include "tvariant.h"
#include "tname.h"
#define ParseTOKENTYPE SStrToken #define ParseTOKENTYPE SStrToken
...@@ -139,8 +140,11 @@ typedef struct SCreatedTableInfo { ...@@ -139,8 +140,11 @@ typedef struct SCreatedTableInfo {
typedef struct SCreateTableSql { typedef struct SCreateTableSql {
SStrToken name; // table name, create table [name] xxx SStrToken name; // table name, create table [name] xxx
SStrToken to; // create stream to anohter table
SStrToken split; // split columns
int8_t type; // create normal table/from super table/ stream int8_t type; // create normal table/from super table/ stream
bool existCheck; bool existCheck;
SName toSName;
struct { struct {
SArray *pTagColumns; // SArray<TAOS_FIELD> SArray *pTagColumns; // SArray<TAOS_FIELD>
...@@ -313,6 +317,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode); ...@@ -313,6 +317,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode);
SArray *appendSelectClause(SArray *pList, void *pSubclause); SArray *appendSelectClause(SArray *pList, void *pSubclause);
void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists); void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists);
void setCreatedStreamOpt(SSqlInfo *pInfo, SStrToken *pTo, SStrToken *pSplit);
void SqlInfoDestroy(SSqlInfo *pInfo); void SqlInfoDestroy(SSqlInfo *pInfo);
......
...@@ -413,14 +413,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt ...@@ -413,14 +413,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt
// create stream // create stream
// create table table_name as select count(*) from super_table_name interval(time) // create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). { create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) to_opt(E) split_opt(F) AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM); A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setCreatedStreamOpt(pInfo, &E, &F);
V.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &V, &U); setCreatedTableName(pInfo, &V, &U);
} }
// to_opt
%type to_opt {SStrToken}
to_opt(A) ::= . {A.n = 0;}
to_opt(A) ::= TO ids(X) cpxName(Y). {
A = X;
A.n += Y.n;
}
// split_opt
%type to_split {SStrToken}
split_opt(A) ::= . {A.n = 0;}
split_opt(A) ::= SPLIT ids(X). { A = X;}
%type column{TAOS_FIELD} %type column{TAOS_FIELD}
%type columnlist{SArray*} %type columnlist{SArray*}
%destructor columnlist {taosArrayDestroy($$);} %destructor columnlist {taosArrayDestroy($$);}
......
...@@ -701,7 +701,6 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t c ...@@ -701,7 +701,6 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t c
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
// no result for first query, data block is required // no result for first query, data block is required
if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) { if (GET_RES_INFO(pCtx) == NULL || GET_RES_INFO(pCtx)->numOfRes <= 0) {
return BLK_DATA_ALL_NEEDED; return BLK_DATA_ALL_NEEDED;
...@@ -1524,33 +1523,65 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* ...@@ -1524,33 +1523,65 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo*
// todo opt for null block // todo opt for null block
static void first_function(SQLFunctionCtx *pCtx) { static void first_function(SQLFunctionCtx *pCtx) {
if (pCtx->order == TSDB_ORDER_DESC) { SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx);
return;
}
int32_t notNullElems = 0; int32_t notNullElems = 0;
int32_t step = 1;
// handle the null value int32_t i = 0;
for (int32_t i = 0; i < pCtx->size; ++i) { bool inputAsc = true;
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { // input data come from sub query, input data order equal to sub query order
continue; if(pCtx->numOfParams == 3) {
if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT && pCtx->param[2].i64 == TSDB_ORDER_DESC) {
step = -1;
i = pCtx->size - 1;
inputAsc = false;
} }
} else if (pCtx->order == TSDB_ORDER_DESC) {
memcpy(pCtx->pOutput, data, pCtx->inputBytes); return ;
if (pCtx->ptsList != NULL) { }
TSKEY k = GET_TS_DATA(pCtx, i);
DO_UPDATE_TAG_COLUMNS(pCtx, k); if(pCtx->order == TSDB_ORDER_ASC && inputAsc) {
for (int32_t m = 0; m < pCtx->size; ++m, i+=step) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
memcpy(pCtx->pOutput, data, pCtx->inputBytes);
if (pCtx->ptsList != NULL) {
TSKEY k = GET_TS_DATA(pCtx, i);
DO_UPDATE_TAG_COLUMNS(pCtx, k);
}
SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG;
pInfo->complete = true;
notNullElems++;
break;
} }
} else { // desc order
for (int32_t m = 0; m < pCtx->size; ++m, i+=step) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) {
continue;
}
SResultRowCellInfo *pInfo = GET_RES_INFO(pCtx); TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
pInfo->hasResult = DATA_SET_FLAG;
pInfo->complete = true; char* buf = GET_ROWCELL_INTERBUF(pResInfo);
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) > ts) {
notNullElems++; pResInfo->hasResult = DATA_SET_FLAG;
break; memcpy(pCtx->pOutput, data, pCtx->inputBytes);
*(TSKEY*)buf = ts;
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
notNullElems++;
break;
}
} }
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
} }
...@@ -1634,16 +1665,23 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) { ...@@ -1634,16 +1665,23 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
* least one data in this block that is not null.(TODO opt for this case) * least one data in this block that is not null.(TODO opt for this case)
*/ */
static void last_function(SQLFunctionCtx *pCtx) { static void last_function(SQLFunctionCtx *pCtx) {
if (pCtx->order != pCtx->param[0].i64) { SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx);
int32_t notNullElems = 0;
int32_t step = -1;
int32_t i = pCtx->size - 1;
// input data come from sub query, input data order equal to sub query order
if(pCtx->numOfParams == 3) {
if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT && pCtx->param[2].i64 == TSDB_ORDER_DESC) {
step = 1;
i = 0;
}
} else if (pCtx->order != pCtx->param[0].i64) {
return; return;
} }
SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx);
int32_t notNullElems = 0;
if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->order == TSDB_ORDER_DESC) {
for (int32_t m = pCtx->size - 1; m >= 0; --m, i += step) {
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) { if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) {
continue; continue;
...@@ -1660,7 +1698,7 @@ static void last_function(SQLFunctionCtx *pCtx) { ...@@ -1660,7 +1698,7 @@ static void last_function(SQLFunctionCtx *pCtx) {
break; break;
} }
} else { // ascending order } else { // ascending order
for (int32_t i = pCtx->size - 1; i >= 0; --i) { for (int32_t m = pCtx->size - 1; m >= 0; --m, i += step) {
char *data = GET_INPUT_DATA(pCtx, i); char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) { if (pCtx->hasNull && isNull(data, pCtx->inputType) && (!pCtx->requireNull)) {
continue; continue;
......
...@@ -4923,6 +4923,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI ...@@ -4923,6 +4923,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI
} }
} }
STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
STsdbQueryCond cond = { STsdbQueryCond cond = {
.colList = pQueryAttr->tableCols, .colList = pQueryAttr->tableCols,
...@@ -4932,6 +4933,12 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { ...@@ -4932,6 +4933,12 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
.loadExternalRows = false, .loadExternalRows = false,
}; };
// set offset with
if(pQueryAttr->skipOffset) {
cond.offset = pQueryAttr->limit.offset;
}
TIME_WINDOW_COPY(cond.twindow, *win); TIME_WINDOW_COPY(cond.twindow, *win);
return cond; return cond;
} }
...@@ -5614,6 +5621,18 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { ...@@ -5614,6 +5621,18 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order; return pTableScanInfo->order;
} }
// check all SQLFunctionCtx is completed
static bool allCtxCompleted(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx) {
// only one false, return false
for(int32_t i = 0; i < pOperator->numOfOutput; i++) {
if(pCtx[i].resultInfo == NULL)
return false;
if(!pCtx[i].resultInfo->complete)
return false;
}
return true;
}
// this is a blocking operator // this is a blocking operator
static SSDataBlock* doAggregate(void* param, bool* newgroup) { static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
...@@ -5652,6 +5671,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -5652,6 +5671,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
// if all pCtx is completed, then query should be over
if(allCtxCompleted(pOperator, pInfo->pCtx))
break;
} }
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
...@@ -5857,19 +5879,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5857,19 +5879,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
return NULL; return NULL;
} }
bool move = false;
int32_t skip = 0;
int32_t remain = 0;
int64_t srows = tsdbSkipOffset(pRuntimeEnv->pQueryHandle);
if (pRuntimeEnv->currentOffset == 0) { if (pRuntimeEnv->currentOffset == 0) {
break; break;
}
else if(srows > 0) {
if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
move = true;
skip = (int32_t)(pRuntimeEnv->currentOffset - srows);
remain = (int32_t)(pBlock->info.rows - skip);
}
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows; pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else { } else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); move = true;
skip = (int32_t)pRuntimeEnv->currentOffset;
remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
}
// need move
if(move) {
pBlock->info.rows = remain; pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes);
} }
pRuntimeEnv->currentOffset = 0; pRuntimeEnv->currentOffset = 0;
...@@ -8496,6 +8537,19 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -8496,6 +8537,19 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
goto _cleanup; goto _cleanup;
} }
// calc skipOffset
if(pQueryMsg->offset > 0 && TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_PROJECTION_QUERY)
&& pQueryAttr->stableQuery == false) {
pQueryAttr->skipOffset = true;
for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) {
if (pQueryAttr->tableCols[i].flist.numOfFilters > 0
&& pQueryAttr->tableCols[i].colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pQueryAttr->skipOffset = false;
break;
}
}
}
if (pSecExprs != NULL) { if (pSecExprs != NULL) {
int32_t resultRowSize = 0; int32_t resultRowSize = 0;
......
...@@ -1009,6 +1009,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken ...@@ -1009,6 +1009,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
pInfo->pCreateTableInfo->existCheck = (pIfNotExists->n != 0); pInfo->pCreateTableInfo->existCheck = (pIfNotExists->n != 0);
} }
void setCreatedStreamOpt(SSqlInfo *pInfo, SStrToken *pTo, SStrToken *pSplit) {
pInfo->pCreateTableInfo->to = *pTo;
pInfo->pCreateTableInfo->split = *pSplit;
}
void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) { void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) {
pInfo->type = type; pInfo->type = type;
if (nParam == 0) { if (nParam == 0) {
......
...@@ -220,6 +220,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { ...@@ -220,6 +220,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId); qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId);
setQueryKilled(pQInfo);
pQInfo->runtimeEnv.outputBuf = NULL; pQInfo->runtimeEnv.outputBuf = NULL;
return doBuildResCheck(pQInfo); return doBuildResCheck(pQInfo);
} }
...@@ -692,4 +693,4 @@ bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { ...@@ -692,4 +693,4 @@ bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
} }
qWarn("pRepo=%p solve problem failed.", pRepo); qWarn("pRepo=%p solve problem failed.", pRepo);
return false; return false;
} }
\ No newline at end of file
此差异已折叠。
...@@ -1669,3 +1669,9 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1669,3 +1669,9 @@ static void rpcDecRef(SRpcInfo *pRpc)
} }
} }
int32_t rpcUnusedSession(void * rpcInfo, bool bLock) {
SRpcInfo *info = (SRpcInfo *)rpcInfo;
if(info == NULL)
return 0;
return taosIdPoolNumOfFree(info->idPool, bLock);
}
\ No newline at end of file
...@@ -37,6 +37,9 @@ ...@@ -37,6 +37,9 @@
.tid = (_checkInfo)->tableId.tid, \ .tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid}) .uid = (_checkInfo)->tableId.uid})
// limit offset start optimization for rows read over this value
#define OFFSET_SKIP_THRESHOLD 5000
enum { enum {
TSDB_QUERY_TYPE_ALL = 1, TSDB_QUERY_TYPE_ALL = 1,
TSDB_QUERY_TYPE_LAST = 2, TSDB_QUERY_TYPE_LAST = 2,
...@@ -115,6 +118,9 @@ typedef struct STsdbQueryHandle { ...@@ -115,6 +118,9 @@ typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb; STsdbRepo* pTsdb;
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
int16_t order; int16_t order;
int64_t offset; // limit offset
int64_t srows; // skip offset rows
int64_t frows; // forbid skip offset rows
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
int32_t numOfBlocks; int32_t numOfBlocks;
...@@ -153,6 +159,11 @@ typedef struct STableGroupSupporter { ...@@ -153,6 +159,11 @@ typedef struct STableGroupSupporter {
STSchema* pTagSchema; STSchema* pTagSchema;
} STableGroupSupporter; } STableGroupSupporter;
typedef struct SRange {
int32_t from;
int32_t to;
} SRange;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
...@@ -410,6 +421,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC ...@@ -410,6 +421,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = INT32_MIN; pQueryHandle->cur.fid = INT32_MIN;
...@@ -526,6 +540,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) { ...@@ -526,6 +540,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
...@@ -1069,74 +1086,301 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s ...@@ -1069,74 +1086,301 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
return midSlot; return midSlot;
} }
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) { // array :1 2 3 5 7 -2 (8 9) skip 4 and 6
int32_t code = 0; int32_t memMoveByArray(SBlock *blocks, SArray *pArray) {
// pArray is NULL or size is zero , no need block to move
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index); if(pArray == NULL)
pCheckInfo->numOfBlocks = 0; return 0;
size_t count = taosArrayGetSize(pArray);
if(count == 0)
return 0;
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) { // memmove
code = terrno; int32_t num = 0;
return code; SRange* ranges = (SRange*)TARRAY_GET_START(pArray);
for(size_t i = 0; i < count; i++) {
int32_t step = ranges[i].to - ranges[i].from + 1;
memmove(blocks + num, blocks + ranges[i].from, sizeof(SBlock) * step);
num += step;
} }
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx; return num;
}
// no data block in this file, try next file // if block data in memory return false else true
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) { bool blockNoItemInMem(STsdbQueryHandle* q, SBlock* pBlock) {
return 0; // no data blocks in the file belongs to pCheckInfo->pTable if(q->pMemRef == NULL) {
return false;
} }
if (pCheckInfo->compSize < (int32_t)compIndex->len) { // mem
assert(compIndex->len > 0); if(q->pMemRef->snapshot.mem) {
SMemTable* mem = q->pMemRef->snapshot.mem;
if(timeIntersect(mem->keyFirst, mem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
}
// imem
if(q->pMemRef->snapshot.imem) {
SMemTable* imem = q->pMemRef->snapshot.imem;
if(timeIntersect(imem->keyFirst, imem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
}
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); return true;
if (t == NULL) { }
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY; #define MAYBE_IN_MEMORY_ROWS 4000 // approximately the capacity of one block
return code; // skip blocks . return value is skip blocks number, skip rows reduce from *pOffset
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, int64_t skey, int64_t ekey,
int32_t sblock, int32_t eblock, SArray** ppArray, bool order) {
int32_t num = 0;
SBlock* blocks = pBlockInfo->blocks;
SArray* pArray = NULL;
SRange range;
range.from = -1;
//
// ASC
//
if(order) {
for(int32_t i = sblock; i < eblock; i++) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == sblock && skey > pBlock->keyFirst) {
q->frows += pBlock->numOfRows; // some rows time < s
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = eblock - 1;
taosArrayPush(pArray, &range);
range.from = -1;
break;
}
}
if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = i;
}
}
// end append
if(range.from != -1) {
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
} }
pCheckInfo->pCompInfo = (SBlockInfo*)t; // ASC return
pCheckInfo->compSize = compIndex->len; *ppArray = pArray;
return num;
}
// DES
for(int32_t i = eblock - 1; i >= sblock; i--) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == eblock - 1 && ekey < pBlock->keyLast) {
q->frows += pBlock->numOfRows; // some rows time > e
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to - 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = 0;
taosArrayPush(pArray, &range);
range.from = -1;
break;
}
}
if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = i;
}
} }
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) { // end append
return terrno; if(range.from != -1) {
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
} }
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; if(pArray == NULL)
return num;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; // reverse array
size_t count = taosArrayGetSize(pArray);
SRange* ranges = TARRAY_GET_START(pArray);
SArray* pArray1 = taosArrayInit(count, sizeof(SRange));
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { size_t i = count - 1;
while(i >= 0) {
range.from = ranges[i].to;
range.to = ranges[i].from;
taosArrayPush(pArray1, &range);
if(i == 0)
break;
i --;
}
*ppArray = pArray1;
taosArrayDestroy(pArray);
return num;
}
// shrink blocks by condition of query
static void shrinkBlocksByQuery(STsdbQueryHandle *pQueryHandle, STableCheckInfo *pCheckInfo) {
SBlockInfo *pCompInfo = pCheckInfo->pCompInfo;
SBlockIdx *compIndex = pQueryHandle->rhelper.pBlkIdx;
bool order = ASCENDING_TRAVERSE(pQueryHandle->order);
if (order) {
assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey); assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else { } else {
assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey); assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey);
} }
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window // discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) { if (s > pCompInfo->blocks[start].keyLast) {
return 0; return ;
} }
// todo speedup the procedure of located end block int32_t end = start;
// locate e index of blocks -> end
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1; end += 1;
} }
pCheckInfo->numOfBlocks = (end - start); // calc offset can skip blocks number
int32_t nSkip = 0;
SArray *pArray = NULL;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, s, e, start, end, &pArray, order);
}
if(nSkip > 0) { // have offset and can skip
pCheckInfo->numOfBlocks = memMoveByArray(pCompInfo->blocks, pArray);
} else { // no offset
pCheckInfo->numOfBlocks = end - start;
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
}
if(pArray)
taosArrayDestroy(pArray);
}
// load one table (tsd_index point to) need load blocks info and put into pCheckInfo->pCompInfo->blocks
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) {
//
// ONE PART. Load all blocks info from one table of tsd_index
//
int32_t code = 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index);
pCheckInfo->numOfBlocks = 0;
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
code = terrno;
return code;
}
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
// no data block in this file, try next file
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) {
return 0; // no data blocks in the file belongs to pCheckInfo->pTable
}
if (start > 0) { if (pCheckInfo->compSize < (int32_t)compIndex->len) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
return code;
}
pCheckInfo->pCompInfo = (SBlockInfo*)t;
pCheckInfo->compSize = compIndex->len;
} }
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
return terrno;
}
//
// TWO PART. shrink no need blocks from all blocks by condition of query
//
shrinkBlocksByQuery(pQueryHandle, pCheckInfo);
(*numOfBlocks) += pCheckInfo->numOfBlocks; (*numOfBlocks) += pCheckInfo->numOfBlocks;
return 0; return 0;
} }
...@@ -4275,3 +4519,12 @@ void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *re ...@@ -4275,3 +4519,12 @@ void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *re
//apply the hierarchical filter expression to every node in skiplist to find the qualified nodes //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
applyFilterToSkipListNode(pSkipList, pExpr, result, param); applyFilterToSkipListNode(pSkipList, pExpr, result, param);
} }
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle) {
return pQueryHandle->srows;
}
return 0;
}
\ No newline at end of file
...@@ -36,6 +36,8 @@ int taosIdPoolNumOfUsed(void *handle); ...@@ -36,6 +36,8 @@ int taosIdPoolNumOfUsed(void *handle);
bool taosIdPoolMarkStatus(void *handle, int id); bool taosIdPoolMarkStatus(void *handle, int id);
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType); ...@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
*/ */
SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr); SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes, not include '\0'
*/
int32_t tStrNCpy(char *dst, SStrToken* srcToken);
/** /**
* check if it is a keyword or not * check if it is a keyword or not
* @param z * @param z
......
...@@ -54,6 +54,13 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar ...@@ -54,6 +54,13 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar
memcpy(target, context.digest, TSDB_KEY_LEN); memcpy(target, context.digest, TSDB_KEY_LEN);
} }
//
// TSKEY util
//
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) { ...@@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) {
pthread_mutex_unlock(&pIdPool->mutex); pthread_mutex_unlock(&pIdPool->mutex);
return ret; return ret;
} }
\ No newline at end of file
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock) {
id_pool_t *pIdPool = handle;
if(bLock)
pthread_mutex_lock(&pIdPool->mutex);
int ret = pIdPool->numOfFree;
if(bLock)
pthread_mutex_unlock(&pIdPool->mutex);
return ret;
}
...@@ -227,6 +227,8 @@ static SKeyword keywordTable[] = { ...@@ -227,6 +227,8 @@ static SKeyword keywordTable[] = {
{"OUTPUTTYPE", TK_OUTPUTTYPE}, {"OUTPUTTYPE", TK_OUTPUTTYPE},
{"AGGREGATE", TK_AGGREGATE}, {"AGGREGATE", TK_AGGREGATE},
{"BUFSIZE", TK_BUFSIZE}, {"BUFSIZE", TK_BUFSIZE},
{"TO", TK_TO},
{"SPLIT", TK_SPLIT},
}; };
static const char isIdChar[] = { static const char isIdChar[] = {
...@@ -676,6 +678,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) { ...@@ -676,6 +678,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
return t0; return t0;
} }
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes
*/
int32_t tStrNCpy(char *dst, SStrToken* srcToken) {
strncpy(dst, srcToken->z, srcToken->n);
return srcToken->n;
}
bool taosIsKeyWordToken(const char* z, int32_t len) { bool taosIsKeyWordToken(const char* z, int32_t len) {
return (tKeywordCode((char*)z, len) != TK_ID); return (tKeywordCode((char*)z, len) != TK_ID);
} }
......
...@@ -470,3 +470,16 @@ FORCE_INLINE double taos_align_get_double(const char* pBuf) { ...@@ -470,3 +470,16 @@ FORCE_INLINE double taos_align_get_double(const char* pBuf) {
memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem
return dv; return dv;
} }
//
// TSKEY util
//
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2) {
// s1,e1 and s2,e2 have 7 scenarios, 5 is intersection, 2 is no intersection, so we pick up 2.
if(e2 < s1 || s2 > e1)
return false;
else
return true;
}
\ No newline at end of file
...@@ -218,6 +218,7 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py ...@@ -218,6 +218,7 @@ python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py
python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py
#query #query
python3 ./test.py -f query/queryBase.py
python3 ./test.py -f query/filter.py python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterCombo.py python3 ./test.py -f query/filterCombo.py
python3 ./test.py -f query/queryNormal.py python3 ./test.py -f query/queryNormal.py
...@@ -272,7 +273,9 @@ python3 ./test.py -f query/queryCnameDisplay.py ...@@ -272,7 +273,9 @@ python3 ./test.py -f query/queryCnameDisplay.py
python3 ./test.py -f query/operator_cost.py python3 ./test.py -f query/operator_cost.py
python3 test.py -f query/nestedQuery/queryWithSpread.py python3 test.py -f query/nestedQuery/queryWithSpread.py
python3 ./test.py -f query/bug6586.py python3 ./test.py -f query/bug6586.py
python3 ./test.py -f query/ts_2016.py
# python3 ./test.py -f query/bug5903.py # python3 ./test.py -f query/bug5903.py
python3 ./test.py -f query/queryLimit.py
#stream #stream
python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/metric_1.py
...@@ -282,7 +285,7 @@ python3 ./test.py -f stream/stream1.py ...@@ -282,7 +285,7 @@ python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py python3 ./test.py -f stream/stream2.py
#python3 ./test.py -f stream/parser.py #python3 ./test.py -f stream/parser.py
python3 ./test.py -f stream/history.py python3 ./test.py -f stream/history.py
python3 ./test.py -f stream/sys.py #python3 ./test.py -f stream/sys.py
python3 ./test.py -f stream/table_1.py python3 ./test.py -f stream/table_1.py
python3 ./test.py -f stream/table_n.py python3 ./test.py -f stream/table_n.py
python3 ./test.py -f stream/showStreamExecTimeisNull.py python3 ./test.py -f stream/showStreamExecTimeisNull.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# query base function test case
#
import sys
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
def caseDescription(self):
'''
Query moudle base api or keyword test case:
case1: api first() last()
case2: none
'''
return
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.prepare()
self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# insert data
self.insert_data("t1", self.ts, 1*10000, 30000, 0);
self.insert_data("t2", self.ts, 2*10000, 30000, 100000);
self.insert_data("t3", self.ts, 3*10000, 30000, 200000);
# test base case
self.case_first()
tdLog.debug(" QUERYBASE first() api ............ [OK]")
# test advance case
self.case_last()
tdLog.debug(" QUERYBASE last() api ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
#
# --------------- case -------------------
#
# create table
def create_tables(self):
# super table
tdSql.execute("create table st(ts timestamp, i1 int) tags(area int)");
# child table
tdSql.execute("create table t1 using st tags(1)");
tdSql.execute("create table t2 using st tags(2)");
tdSql.execute("create table t3 using st tags(3)");
return
# insert data1
def insert_data(self, tbname, ts_start, count, batch_num, base):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d)"%(ts_start + i*1000, base + i)
if i >0 and i%batch_num == 0:
tdSql.execute(sql)
sql = pre_insert
# end sql
if sql != pre_insert:
tdSql.execute(sql)
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# first case base
def case_first(self):
#
# last base function
#
# base t1 table
sql = "select first(*) from t1 where ts>='2017-07-14 12:40:00' order by ts asc;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
sql = "select first(*) from t1 where ts>='2017-07-14 12:40:00' order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
# super table st
sql = "select first(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 3600)
sql = "select first(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 3600)
# sub query
sql = "select first(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts asc );"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 187019100)
sql = "select first(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts desc );" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 187019100)
return
# last case
def case_last(self):
#
# last base test
#
# base t1 table
sql = "select last(*) from t1 where ts<='2017-07-14 12:40:00' order by ts asc;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
sql = "select last(*) from t1 where ts<='2017-07-14 12:40:00' order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
# super table st
sql = "select last(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts;"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
sql = "select last(*) from st where ts>='2017-07-14 11:40:00' and ts<='2017-07-14 12:40:00' and tbname in('t1') order by ts desc;" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 7200)
# sub query
sql = "select last(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts asc );"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 192419100)
sql = "select last(*) from ( select sum(i1) from st where ts>='2017-07-14 11:40:00' and ts<'2017-07-14 12:40:00' interval(10m) order by ts desc );" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 192419100)
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
def caseDescription(self):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: limit offset advance test
'''
return
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.prepare()
self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# insert data
self.insert_data("t1", self.ts, 300*10000, 30000);
# test base case
self.test_case1()
tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case
self.test_case2()
tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
#
# --------------- case -------------------
#
# create table
def create_tables(self):
# super table
tdSql.execute("create table st(ts timestamp, i1 int) tags(area int)");
# child table
tdSql.execute("create table t1 using st tags(1)");
tdSql.execute("create table t2 using st tags(2)");
tdSql.execute("create table t3 using st tags(3)");
return
# insert data1
def insert_data(self, tbname, ts_start, count, batch_num):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d)"%(ts_start + i*1000, i)
if i >0 and i%batch_num == 0:
tdSql.execute(sql)
sql = pre_insert
# end sql
if sql != pre_insert:
tdSql.execute(sql)
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# test case1 base
def test_case1(self):
#
# limit base function
#
# base no where
sql = "select * from t1 limit 10"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 0)
tdSql.checkData(9, 1, 9)
sql = "select * from t1 order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999999)
tdSql.checkData(9, 1, 2999990)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:01' and ts<'2017-07-14 10:40:06' limit 10"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 1)
tdSql.checkData(4, 1, 5)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 2999996)
tdSql.checkData(4, 1, 2999992)
#
# offset base function
#
# no where
sql = "select * from t1 limit 10 offset 5"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 5)
tdSql.checkData(9, 1, 14)
sql = "select * from t1 order by ts desc limit 10 offset 5" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999994)
tdSql.checkData(9, 1, 2999985)
# have where only ts
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:20' limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 15)
tdSql.checkData(4, 1, 19)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10 offset 4" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 2999992)
# have where with other column condition
sql = "select * from t1 where i1>=1 and i1<11 limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 6)
tdSql.checkData(4, 1, 10)
sql = "select * from t1 where i1>=300000 and i1<=500000 order by ts desc limit 10 offset 100000" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 400000)
tdSql.checkData(9, 1, 399991)
# have where with ts and other column condition
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:50' and i1>=20 and i1<=25 limit 10 offset 5"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 25)
return
# test advance
def test_case2(self):
#
# OFFSET merge file data with memory data
#
# offset
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000)
# each insert one row into NO.0 NO.2 NO.7 blocks
sql = "insert into t1 values (%d, 0) (%d, 2) (%d, 7)"%(self.ts+1, self.ts + 2*3300*1000+1, self.ts + 7*3300*1000+1)
tdSql.execute(sql)
# query result
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3 + 10 + 1)
# have where desc
sql = "select * from t1 where ts<'2017-07-14 20:40:00' order by ts desc limit 15 offset 36000"
tdSql.waitedQuery(sql, 3, WAITS)
tdSql.checkData(0, 1, 1)
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -26,6 +26,8 @@ class TDTestCase: ...@@ -26,6 +26,8 @@ class TDTestCase:
''' '''
tdCom.cleanTb() tdCom.cleanTb()
table_name = tdCom.getLongName(8, "letters_mixed") table_name = tdCom.getLongName(8, "letters_mixed")
while table_name.islower():
table_name = tdCom.getLongName(8, "letters_mixed")
table_name_sub = f'{table_name}_sub' table_name_sub = f'{table_name}_sub'
tb_name_lower = table_name_sub.lower() tb_name_lower = table_name_sub.lower()
tb_name_upper = table_name_sub.upper() tb_name_upper = table_name_sub.upper()
......
###################################################################
# Copyright (c) 2021 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def caseDescription(self):
'''
case1<shenglian zhou>: [TS-2016]fix select * from (select * from empty_stable)
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self._conn = conn
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists td12229")
tdSql.execute("create database if not exists td12229")
tdSql.execute('use td12229')
tdSql.execute('create stable st(ts timestamp , value int ) tags (ind int)')
tdSql.execute('insert into tb1 using st tags(1) values(now ,1)')
tdSql.execute('insert into tb1 using st tags(1) values(now+1s ,2)')
tdSql.execute('insert into tb1 using st tags(1) values(now+2s ,3)')
tdSql.execute('create stable ste(ts timestamp , value int ) tags (ind int)')
tdSql.query('select * from st')
tdSql.checkRows(3)
tdSql.query('select * from (select * from ste)')
tdSql.checkRows(0)
tdSql.query('select * from st union all select * from ste')
tdSql.checkRows(3)
tdSql.query('select * from ste union all select * from st')
tdSql.checkRows(3)
tdSql.query('select count(ts) from ste group by tbname union all select count(ts) from st group by tbname;')
tdSql.checkRows(1)
tdSql.query('select count(ts) from st group by tbname union all select count(ts) from ste group by tbname;')
tdSql.checkRows(1)
tdSql.execute('drop database td12229')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -39,6 +39,7 @@ class TDTestCase: ...@@ -39,6 +39,7 @@ class TDTestCase:
def run(self): def run(self):
tbNum = 10 tbNum = 10
rowNum = 20 rowNum = 20
ts_begin = 1633017600000
tdSql.prepare() tdSql.prepare()
...@@ -49,8 +50,8 @@ class TDTestCase: ...@@ -49,8 +50,8 @@ class TDTestCase:
tdSql.execute("create table tb%d using stb tags(%d)" % (i, i)) tdSql.execute("create table tb%d using stb tags(%d)" % (i, i))
for j in range(rowNum): for j in range(rowNum):
tdSql.execute( tdSql.execute(
"insert into tb%d values (now - %dm, %d, %d)" % "insert into tb%d values (%d, %d, %d)" %
(i, 1440 - j, j, j)) (i, ts_begin + j, j, j))
time.sleep(0.1) time.sleep(0.1)
self.createFuncStream("count(*)", "c1", 200) self.createFuncStream("count(*)", "c1", 200)
......
...@@ -27,13 +27,14 @@ class TDTestCase: ...@@ -27,13 +27,14 @@ class TDTestCase:
def run(self): def run(self):
rowNum = 200 rowNum = 200
tdSql.prepare() tdSql.prepare()
ts_now = 1633017600000
tdLog.info("=============== step1") tdLog.info("=============== step1")
tdSql.execute("create table mt(ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)") tdSql.execute("create table mt(ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)")
for i in range(5): for i in range(5):
tdSql.execute("create table tb%d using mt tags(%d)" % (i, i)) tdSql.execute("create table tb%d using mt tags(%d)" % (i, i))
for j in range(rowNum): for j in range(rowNum):
tdSql.execute("insert into tb%d values(now + %ds, %d, %d)" % (i, j, j, j)) tdSql.execute("insert into tb%d values(%d, %d, %d)" % (i, ts_now, j, j))
ts_now += 1000
time.sleep(0.1) time.sleep(0.1)
tdLog.info("=============== step2") tdLog.info("=============== step2")
...@@ -45,14 +46,15 @@ class TDTestCase: ...@@ -45,14 +46,15 @@ class TDTestCase:
tdSql.waitedQuery("select * from st", 1, 180) tdSql.waitedQuery("select * from st", 1, 180)
delay = int(time.time() - start) + 80 delay = int(time.time() - start) + 80
v = tdSql.getData(0, 3) v = tdSql.getData(0, 3)
if v >= 51: if v != 10:
tdLog.exit("value is %d, which is larger than 51" % v) tdLog.exit("value is %d, expect is 10." % v)
tdLog.info("=============== step4") tdLog.info("=============== step4")
for i in range(5, 10): for i in range(5, 10):
tdSql.execute("create table tb%d using mt tags(%d)" % (i, i)) tdSql.execute("create table tb%d using mt tags(%d)" % (i, i))
for j in range(rowNum): for j in range(rowNum):
tdSql.execute("insert into tb%d values(now + %ds, %d, %d)" % (i, j, j, j)) tdSql.execute("insert into tb%d values(%d, %d, %d)" % (i, ts_now, j, j))
ts_now += 1000
tdLog.info("=============== step5") tdLog.info("=============== step5")
maxValue = 0 maxValue = 0
...@@ -62,11 +64,11 @@ class TDTestCase: ...@@ -62,11 +64,11 @@ class TDTestCase:
v = tdSql.getData(0, 3) v = tdSql.getData(0, 3)
if v > maxValue: if v > maxValue:
maxValue = v maxValue = v
if v > 51: if v >= 10:
break break
if maxValue <= 51: if maxValue < 10:
tdLog.exit("value is %d, which is smaller than 51" % maxValue) tdLog.exit("value is %d, expect is 10" % maxValue)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -47,7 +47,7 @@ class TDTestCase: ...@@ -47,7 +47,7 @@ class TDTestCase:
"select * from iostrm", "select * from iostrm",
] ]
for sql in sqls: for sql in sqls:
(rows, _) = tdSql.waitedQuery(sql, 1, 240) (rows, _) = tdSql.waitedQuery(sql, 1, 600)
if rows < 1: if rows < 1:
tdLog.exit("failed: sql:%s, expect at least one row" % sql) tdLog.exit("failed: sql:%s, expect at least one row" % sql)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册