“91f67330b16bcb93b917f8a357a33a97e67e2ff1”上不存在“docs/en/05-develop/04-query-data.md”
未验证 提交 f6c56020 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #10116 from taosdata/fix/TS-1164-D

[TS-1164]<fix>(query): develop->CQ Support write to super table
...@@ -29,6 +29,14 @@ extern "C" { ...@@ -29,6 +29,14 @@ extern "C" {
#include "tsched.h" #include "tsched.h"
#include "tsclient.h" #include "tsclient.h"
#define LABEL_SQL "sql:"
#define LABEL_TO " to:"
#define LABEL_SPLIT " split:"
#define LABEL_SQL_LEN (sizeof(LABEL_SQL) - 1)
#define LABEL_TO_LEN (sizeof(LABEL_TO) - 1)
#define LABEL_SPLIT_LEN (sizeof(LABEL_SPLIT) - 1)
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ #define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
......
...@@ -405,6 +405,10 @@ typedef struct SSqlStream { ...@@ -405,6 +405,10 @@ typedef struct SSqlStream {
int16_t precision; int16_t precision;
int64_t num; // number of computing count int64_t num; // number of computing count
int32_t dstCols; // dstTable has number of columns
char* to;
char* split;
/* /*
* keep the number of current result in computing, * keep the number of current result in computing,
* the value will be set to 0 before set timer for next computing * the value will be set to 0 before set timer for next computing
...@@ -484,6 +488,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, ...@@ -484,6 +488,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, TAOS **taos); void *param, TAOS **taos);
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res); TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res);
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param); TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param);
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code); void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
......
...@@ -53,3 +53,4 @@ taos_is_null ...@@ -53,3 +53,4 @@ taos_is_null
taos_insert_lines taos_insert_lines
taos_schemaless_insert taos_schemaless_insert
taos_result_block taos_result_block
taos_print_row_ex
\ No newline at end of file
...@@ -1443,6 +1443,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql, ...@@ -1443,6 +1443,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql,
const char* msg3 = "no acctId"; const char* msg3 = "no acctId";
const char* msg4 = "db name too long"; const char* msg4 = "db name too long";
const char* msg5 = "table name too long"; const char* msg5 = "table name too long";
const char* msg6 = "table name empty";
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -1494,6 +1495,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql, ...@@ -1494,6 +1495,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql,
if (pTableName->n >= TSDB_TABLE_NAME_LEN) { if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if(pTableName->n == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
char name[TSDB_TABLE_FNAME_LEN] = {0}; char name[TSDB_TABLE_FNAME_LEN] = {0};
...@@ -9081,6 +9084,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -9081,6 +9084,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
if (tscValidateName(pName, true, &dbIncluded1) != TSDB_CODE_SUCCESS) { if (tscValidateName(pName, true, &dbIncluded1) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
// check to valid and create to name
if(pInfo->pCreateTableInfo->to.n > 0) {
bool dbInclude = false;
if (tscValidateName(&pInfo->pCreateTableInfo->to, false, &dbInclude) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
int32_t code = tscSetTableFullName(&pInfo->pCreateTableInfo->toSName, &pInfo->pCreateTableInfo->to, pSql, dbInclude);
if(code != TSDB_CODE_SUCCESS) {
return code;
}
}
SRelationInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from; SRelationInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from;
if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->list) == 0) { if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->list) == 0) {
......
...@@ -1546,11 +1546,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1546,11 +1546,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pCreateTableInfo->pSelect != NULL) { if (pCreateTableInfo->pSelect != NULL) {
size += (pCreateTableInfo->pSelect->sqlstr.n + 1); size += (pCreateTableInfo->pSelect->sqlstr.n + 1);
// add size = create super table with same columns and 1 tags
if(pCreateTableInfo->to.n > 0) {
size += sizeof(SCreateTableMsg);
size += sizeof(SSchema) * (pCmd->numOfCols + 1);
size += pCreateTableInfo->to.n + 4; // to:
if(pCreateTableInfo->split.n > 0)
size += pCreateTableInfo->split.n + 7; // split:
}
} }
return size + TSDB_EXTRA_PAYLOAD_SIZE; return size + TSDB_EXTRA_PAYLOAD_SIZE;
} }
char* fillCreateSTableMsg(SCreateTableMsg* pCreateMsg, SCreateTableSql* pTableSql, SSqlCmd* pCmd, SSqlInfo *pInfo) {
// SET
SSchema* pSchema = (SSchema *)pCreateMsg->schema;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pCreateMsg->igExists = 0;
pCreateMsg->sqlLen = 0;
// FullName like acctID.dbName.tableName
tNameExtractFullName(&pInfo->pCreateTableInfo->toSName, pCreateMsg->tableName);
// copy columns
pCreateMsg->numOfColumns = htons(pCmd->numOfCols);
for (int i = 0; i < pCmd->numOfCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type;
strcpy(pSchema->name, pField->name);
pSchema->bytes = htons(pField->bytes);
pSchema++;
}
// append one tag
pCreateMsg->numOfTags = htons(1); // only one tag immutable
pSchema->type = TSDB_DATA_TYPE_INT;
pSchema->bytes = htons(INT_BYTES);
strcpy(pSchema->name, "tag1");
pSchema ++;
return (char *)pSchema;
}
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int msgLen = 0; int msgLen = 0;
int size = 0; int size = 0;
...@@ -1608,39 +1646,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1608,39 +1646,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCreate->len = htonl(len); pCreate->len = htonl(len);
} }
} else { // create (super) table } else { // create (super) table
// FIRST MSG
SCreateTableMsg* pCreate = pCreateMsg;
pCreateTableMsg->numOfTables = htonl(1); // only one table will be created pCreateTableMsg->numOfTables = htonl(1); // only one table will be created
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreate->tableName);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreateMsg->tableName); bool to = pInfo->pCreateTableInfo->to.n > 0;
assert(code == 0); assert(code == 0);
SCreateTableSql *pCreateTable = pInfo->pCreateTableInfo; SCreateTableSql *pCreateTable = pInfo->pCreateTableInfo;
pCreateMsg->igExists = pCreateTable->existCheck ? 1 : 0; pCreate->igExists = pCreateTable->existCheck ? 1 : 0;
pCreateMsg->numOfColumns = htons(pCmd->numOfCols); pCreate->numOfColumns = htons(pCmd->numOfCols);
pCreateMsg->numOfTags = htons(pCmd->count); pCreate->numOfTags = htons(pCmd->count);
pCreate->sqlLen = 0;
pCreateMsg->sqlLen = 0; pSchema = (SSchema *)pCreate->schema;
pMsg = (char *)pCreateMsg->schema; //copy schema
pSchema = (SSchema *)pCreateMsg->schema;
for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) { for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type; pSchema->type = pField->type;
strcpy(pSchema->name, pField->name); strcpy(pSchema->name, pField->name);
pSchema->bytes = htons(pField->bytes); pSchema->bytes = htons(pField->bytes);
pSchema++; pSchema++;
} }
//copy stream sql if have
pMsg = (char *)pSchema; pMsg = (char *)pSchema;
if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql
SSqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect; SSqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect;
int16_t len = 0;
if(to) {
//sql:
strcpy(pMsg, LABEL_SQL);
len += LABEL_SQL_LEN;
len += tStrNCpy(pMsg + len, &pQuerySql->sqlstr);
//to
strcpy(pMsg + len, LABEL_TO);
len += LABEL_TO_LEN;
len += tStrNCpy(pMsg + len, &pInfo->pCreateTableInfo->to);
//split if
if(pInfo->pCreateTableInfo->split.n > 0) {
strcpy(pMsg + len, LABEL_SPLIT);
len += LABEL_SPLIT_LEN;
len += tStrNCpy(pMsg + len, &pInfo->pCreateTableInfo->split);
}
// append string end flag
pMsg[len++] = 0;
pMsg += len;
pCreate->sqlLen = htons(len);
} else {
len = pQuerySql->sqlstr.n;
strncpy(pMsg, pQuerySql->sqlstr.z, len);
pMsg[len++] = 0; // string end
pMsg += len;
pCreate->sqlLen = htons(len);
}
}
// calc first msg length
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
strncpy(pMsg, pQuerySql->sqlstr.z, pQuerySql->sqlstr.n + 1); // filling second msg if to have value
pCreateMsg->sqlLen = htons(pQuerySql->sqlstr.n + 1); if(to) {
pMsg += pQuerySql->sqlstr.n + 1; pCreate = (SCreateTableMsg *)pMsg;
pMsg = fillCreateSTableMsg(pCreate, pCreateTable, pCmd, pInfo);
len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
pCreateTableMsg->numOfTables = htonl(2);
} }
} }
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "tutil.h" #include "tutil.h"
#include "ttimer.h" #include "ttimer.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "tidpool.h"
static char clusterDefaultId[] = "clusterDefaultId"; static char clusterDefaultId[] = "clusterDefaultId";
static bool validImpl(const char* str, size_t maxsize) { static bool validImpl(const char* str, size_t maxsize) {
...@@ -307,6 +308,25 @@ void taos_close(TAOS *taos) { ...@@ -307,6 +308,25 @@ void taos_close(TAOS *taos) {
taosRemoveRef(tscRefId, pObj->rid); taosRemoveRef(tscRefId, pObj->rid);
} }
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos) {
// param valid check
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("pObj:%p is NULL or freed", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
if(pObj->pRpcObj == NULL ) {
tscError("pObj:%p pRpcObj is NULL.", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
// get number
return rpcUnusedSession(pObj->pRpcObj->pDnodeConn, false);
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(tres != NULL); assert(tres != NULL);
...@@ -807,13 +827,16 @@ bool taos_is_update_query(TAOS_RES *res) { ...@@ -807,13 +827,16 @@ bool taos_is_update_query(TAOS_RES *res) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command); return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command);
} }
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
return taos_print_row_ex(str, row, fields, num_fields, ' ', false);
}
int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota) {
int len = 0; int len = 0;
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (i > 0) { if (i > 0) {
str[len++] = ' '; str[len++] = split;
} }
if (row[i] == NULL) { if (row[i] == NULL) {
...@@ -874,9 +897,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) ...@@ -874,9 +897,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
} else { } else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0); assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
} }
// add pre quotaion if require
if(addQuota) {
*(str + len) = '\'';
len += 1;
}
// copy content
memcpy(str + len, row[i], charLen); memcpy(str + len, row[i], charLen);
len += charLen; len += charLen;
// add end quotaion if require
if(addQuota) {
*(str + len)= '\'';
len += 1;
}
} break; } break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
...@@ -893,6 +930,89 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) ...@@ -893,6 +930,89 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
return len; return len;
} }
// print field value to str
int taos_print_field(char *str, void* value, TAOS_FIELD *field) {
// check valid
if (str == NULL || value == NULL || field == NULL) {
return 0;
}
// get value
int len = 0;
switch (field->type) {
//
// fixed length
//
case TSDB_DATA_TYPE_TINYINT:
len = sprintf(str, "%d", *((int8_t *)value));
break;
case TSDB_DATA_TYPE_UTINYINT:
len = sprintf(str, "%u", *((uint8_t *)value));
break;
case TSDB_DATA_TYPE_SMALLINT:
len = sprintf(str, "%d", *((int16_t *)value));
break;
case TSDB_DATA_TYPE_USMALLINT:
len = sprintf(str, "%u", *((uint16_t *)value));
break;
case TSDB_DATA_TYPE_INT:
len = sprintf(str, "%d", *((int32_t *)value));
break;
case TSDB_DATA_TYPE_UINT:
len = sprintf(str, "%u", *((uint32_t *)value));
break;
case TSDB_DATA_TYPE_BIGINT:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_UBIGINT:
len = sprintf(str, "%" PRIu64, *((uint64_t *)value));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(value);
len = sprintf(str, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(value);
len = sprintf(str, "%lf", dv);
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_BOOL:
len = sprintf(str, "%d", *((int8_t *)value));
//
// variant length
//
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
len = varDataLen((char*)value - VARSTR_HEADER_SIZE);
if (field->type == TSDB_DATA_TYPE_BINARY) {
assert(len <= field->bytes && len >= 0);
} else {
assert(len <= field->bytes * TSDB_NCHAR_SIZE && len >= 0);
}
memcpy(str, value, len);
} break;
default:
break;
}
return len;
}
static void asyncCallback(void *param, TAOS_RES *tres, int code) { static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param); SSqlObj *pSql = ((SSqlObj *)param);
......
...@@ -30,6 +30,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -30,6 +30,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql); static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer); static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer);
static int64_t getLaunchTimeDelay(const SSqlStream* pStream);
static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) { static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) {
return taosGetTimestamp(pStream->precision) + launchDelay - pStream->stime - 1; return taosGetTimestamp(pStream->precision) + launchDelay - pStream->stime - 1;
...@@ -49,7 +50,7 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) { ...@@ -49,7 +50,7 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) {
static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) { static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) {
float retryRangeFactor = 0.3f; float retryRangeFactor = 0.3f;
int64_t retryDelta = (int64_t)(tsRetryStreamCompDelay * retryRangeFactor); int64_t retryDelta = (int64_t)(tsRetryStreamCompDelay * retryRangeFactor);
retryDelta = ((rand() % retryDelta) + tsRetryStreamCompDelay) * 1000L; retryDelta = (rand() % retryDelta) + tsRetryStreamCompDelay;
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
// change to ms // change to ms
...@@ -113,7 +114,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { ...@@ -113,7 +114,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
pQueryInfo->command = TSDB_SQL_SELECT; pQueryInfo->command = TSDB_SQL_SELECT;
pSql->fp = tscProcessStreamQueryCallback; pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback; pSql->fetchFp = tscProcessStreamQueryCallback;
executeQuery(pSql, pQueryInfo); executeQuery(pSql, pQueryInfo);
tscIncStreamExecutionCount(pStream); tscIncStreamExecutionCount(pStream);
...@@ -142,7 +143,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -142,7 +143,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if(pSql == NULL) { if(pSql == NULL) {
return ; return ;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" add into timer", pSql->self); tscDebug("0x%"PRIx64" add into timer", pSql->self);
...@@ -160,18 +160,19 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -160,18 +160,19 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
} else { } else {
pQueryInfo->window.skey = pStream->stime; pQueryInfo->window.skey = pStream->stime;
int64_t etime = taosGetTimestamp(pStream->precision); int64_t etime = taosGetTimestamp(pStream->precision);
int64_t one = convertTimePrecision(1, TSDB_TIME_PRECISION_MILLI, pStream->precision);
// delay to wait all data in last time window // delay to wait all data in last time window
etime -= convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision); etime -= convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
if (etime > pStream->etime) { if (etime > pStream->etime) {
etime = pStream->etime; etime = pStream->etime;
} else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') { } else if (pStream->interval.intervalUnit != 'y' && pStream->interval.intervalUnit != 'n') {
if(pStream->stime == INT64_MIN) { if(pStream->stime == INT64_MIN) {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision) - one;
} else { } else {
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval; etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval - one;
} }
} else { } else {
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision); etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision) - one;
} }
pQueryInfo->window.ekey = etime; pQueryInfo->window.ekey = etime;
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) { if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
...@@ -179,24 +180,39 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -179,24 +180,39 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') { if (pStream->interval.intervalUnit == 'y' || pStream->interval.intervalUnit == 'n') {
timer = 86400 * 1000l; timer = 86400 * 1000l;
} else { } else {
timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI); int32_t loop = 10000;
int64_t next_time = pStream->stime;
while(1) {
// get next time
next_time = taosTimeAdd(next_time, pStream->interval.sliding, pStream->interval.intervalUnit, TSDB_TIME_PRECISION_MILLI);
timer = next_time - taosGetTimestamp(pStream->precision); // next time - now()
if(timer < 0 && --loop > 0 ) {
//tscDebug("CQ next time < now so loop add sliding. next_time=%" PRId64, next_time);
continue;
}
// calc launch delay time
int64_t delay = getLaunchTimeDelay((const SSqlStream*)pStream);
timer += delay;
tscDebug("CQ execute next query after %" PRId64 "ms (delay=%" PRId64 ")", timer, delay);
break;
}
} }
tscSetRetryTimer(pStream, pSql, timer); tscSetRetryTimer(pStream, pSql, timer);
return; return;
} }
} }
tscDebug("CQ ProcessStreamTimer skey=%" PRId64 " ekey=%" PRId64 " stime=%" PRId64 " etime=%" PRId64, pQueryInfo->window.skey, pQueryInfo->window.ekey, pStream->stime, pStream->etime);
// launch stream computing in a new thread // launch stream computing in a new thread
SSchedMsg schedMsg = {0}; SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessStreamLaunchQuery; schedMsg.fp = tscProcessStreamLaunchQuery;
schedMsg.ahandle = pStream; schedMsg.ahandle = pStream;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
schedMsg.msg = NULL; schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
} }
static void cbParseSql(void* param, TAOS_RES* res, int code);
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlStream *pStream = (SSqlStream *)param; SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) { if (tres == NULL || numOfRows < 0) {
...@@ -204,28 +220,23 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -204,28 +220,23 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self, tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay); pStream, numOfRows, retryDelay);
SSqlObj* pSql = pStream->pSql; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0);
tscFreeSqlResult(pSql); char name[TSDB_TABLE_FNAME_LEN] = {0};
tscFreeSubobj(pSql); tNameExtractFullName(&pTableMetaInfo->name, name);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->parseRetry = 0; taosHashRemove(UTIL_GET_TABLEMETA(pStream->pSql), name, strnlen(name, TSDB_TABLE_FNAME_LEN));
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) { tfree(pTableMetaInfo->pTableMeta);
cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self);
} else {
tscError("0x%"PRIx64" open stream failed, code:%s", pSql->self, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
free(pStream);
return;
}
// tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscFreeSqlResult(pStream->pSql);
// return; tscFreeSubobj(pStream->pSql);
tfree(pStream->pSql->pSubs);
pStream->pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return;
} }
taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param); taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param);
...@@ -272,14 +283,233 @@ static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) { ...@@ -272,14 +283,233 @@ static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#endif #endif
} }
// callback send values
int32_t ok_cnt = 0;
int32_t err_cnt = 0;
void cbSendValues(void *param, TAOS_RES *res, int code) {
if(code < 0) {
err_cnt ++;
tscError("CQ Send Failed. code=0x%x ok_cnt=%d err_cnt=%d", code, ok_cnt,err_cnt);
} else {
ok_cnt ++;
tscInfo("CQ Send OK. row=%d ok_cnt=%d err_cnt=%d", code, ok_cnt, err_cnt);
}
}
// append values
size_t appendValues(TAOS_FIELD* fields, int32_t numCols, TAOS_ROW row, char* pBuf, size_t bufLen, size_t curLen, bool *full) {
// calc buf is full
size_t needLen = 0;
size_t rowLen = 0;
int i;
for(i = 0; i < numCols; i++) {
needLen += fields[i].bytes;
}
// estimate length for use
needLen += numCols * 5 + 20;
if(needLen >= bufLen - curLen) {
*full = true;
return 0;
}
// row append to values
char* strRow = tmalloc(needLen);
char *value = pBuf + curLen;
strcpy(value, "(");
rowLen += taos_print_row_ex(strRow, row, fields, numCols, ',', true);
strcat(value, strRow);
strcat(value, ")");
rowLen +=2;
tfree(strRow);
return rowLen;
}
bool sqlBufSend(TAOS *taos, char *sqlBuf) {
// if no enough free session, wait max 10s
int32_t sleepCnt = 0;
do {
int32_t session = taos_unused_session(taos);
if(session > 1000) {
break;
}
taosMsleep(500);
tscInfo("CQ session < 1000. session=%d Wait 0.5s cnt=%d", session, sleepCnt);
} while(++sleepCnt < 20);
strcat(sqlBuf, ";");
taos_query_ra(taos, sqlBuf, cbSendValues, NULL);
return true;
}
#define STR_SQL_INSERT "insert into "
// send one table all rows for once
bool sendChildTalbe(TAOS *taos, char *superName, char *tableName, TAOS_FIELD *fields, int32_t numCols,
SArray *arr, char* sqlBuf, int32_t bufLen) {
char dbName[TSDB_DB_NAME_LEN] = "";
char dbTable[TSDB_TABLE_FNAME_LEN];
size_t numRows = taosArrayGetSize(arr);
if(numRows == 0)
return false;
// obtain dbname
char * p = strstr(superName, ".");
if(p) { // if have db prefix , under this db create table
int32_t len = (int32_t)(p - superName);
strncpy(dbName, superName, len);
dbName[len] = 0; // append str end
sprintf(dbTable, "%s.%s", dbName, tableName);
} else {
// no db prefix
strcpy(dbTable, tableName);
}
// first enter
if(sqlBuf[0] == 0) {
strcpy(sqlBuf, STR_SQL_INSERT);
} else { // check need send
if( bufLen - strlen(sqlBuf) < 300) {
sqlBufSend(taos, sqlBuf);
strcpy(sqlBuf, STR_SQL_INSERT);
}
}
// init
int32_t preLen = (int32_t)strlen(sqlBuf);
char *subBuf = sqlBuf + preLen;
int32_t subLen = bufLen - preLen;
sprintf(subBuf, " %s using %s tags(0) values ", dbTable, superName);
size_t curLen = strlen(subBuf);
TAOS_ROW row ;
bool full = false;
for(size_t i = 0; i < numRows; i++) {
row = (TAOS_ROW)taosArrayGetP(arr, i);
if(row == NULL)
continue;
if(subLen > 200)
curLen += appendValues(fields, numCols, row, subBuf, subLen - 100, curLen, &full);
else
full = true;
if(full) { // need send
// send current
sqlBufSend(taos, sqlBuf);
// init reset
strcpy(sqlBuf, STR_SQL_INSERT);
preLen = (int32_t)strlen(sqlBuf);
subBuf = sqlBuf + preLen;
subLen = bufLen - preLen;
sprintf(subBuf, " %s using %s tags(0) values ", dbTable, superName);
curLen = strlen(subBuf);
// retry append. if full is true again, ignore this row
curLen += appendValues(fields, numCols, row, subBuf, subLen - 100, curLen, &full);
full = false; // reset to false
}
tfree(row);
}
return true;
}
// write cq result to another table
bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32_t numCols, SHashObj *tbHash, int32_t numRows) {
int32_t bufLen = TSDB_MAX_SQL_LEN/2 - 128;
char * sqlBuf = tmalloc(bufLen);
sqlBuf[0] = 0; // init
ok_cnt = 0;
err_cnt = 0;
int cnt_table = 0;
void *pIter = taosHashIterate(tbHash, NULL);
while(pIter) {
SArray *arr = *(SArray**)pIter;
if(arr) {
// get key as tableName
SHashNode *pNode = (SHashNode *)GET_HASH_PNODE(pIter);
char *data = (char *)GET_HASH_NODE_KEY(pNode);
uint32_t len = pNode->keyLen;
char *key = tmalloc(len + 1);
memcpy(key, data, len);
key[len] = 0; // string end '\0'
// send all this table rows
sendChildTalbe(pTscObj, superName, key, fields, numCols, arr, sqlBuf, bufLen);
// release SArray
taosArrayDestroy(&arr);
tfree(key);
cnt_table ++;
}
pIter = taosHashIterate(tbHash, pIter);
}
if(sqlBuf[0]) {
sqlBufSend(pTscObj, sqlBuf);
}
tscInfo("CQ ===== stream %d rows write to %d tables ===== \n", numRows, cnt_table);
tfree(sqlBuf);
return true;
}
// add row to hash to group by tbname
bool tbHashAdd(SHashObj *tbHash, TAOS_ROW row, TAOS_FIELD* fields, int32_t idx, int32_t numCols) {
void *v = row[idx];
TAOS_FIELD *field = &fields[idx];
VarDataLenT len = 0;
char str[128];
memset(str, 0, sizeof(str));
char *key = str;
// get key and len
if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_NCHAR) {
key = v;
len = varDataLen((char*)v - VARSTR_HEADER_SIZE);
} else {
len = taos_print_field(str, v, field);
}
if(len == 0) {
return false;
}
// append key with len
SArray *arr = NULL;
void* pdata = taosHashGet(tbHash, key, len);
if(pdata) {
arr = *(SArray **)pdata;
}
// if group is null create new
if(arr == NULL) {
arr = (SArray *)taosArrayInit(10, sizeof(TAOS_ROW));
if(arr == NULL) {
tscError("tbHashAdd tbHash:%p, taosArrayInit(10,sizeof(TAOS_ROW) return NULL.", tbHash);
return false;
}
taosHashPut(tbHash, key, len, &arr, sizeof(SArray *));
}
// append to group
int32_t new_len = sizeof(void*) * numCols;
TAOS_ROW new_row = (TAOS_ROW)tmalloc(new_len);
memcpy(new_row, row, new_len);
taosArrayPush(arr, &new_row);
return true;
}
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) { static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
SSqlStream * pStream = (SSqlStream *)param; SSqlStream * pStream = (SSqlStream *)param;
SSqlObj * pSql = (SSqlObj *)res; SSqlObj * pSql = (SSqlObj *)res;
bool toAnother = pStream->to != NULL;
if (pSql == NULL || numOfRows < 0) { if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscError("stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 " ms", pStream, numOfRows, retryDelayTime); tscError("stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 " ms", pStream, numOfRows, retryDelayTime);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
return; return;
} }
...@@ -288,23 +518,72 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -288,23 +518,72 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
// init hash
SHashObj* tbHash = NULL;
int32_t colIdx = -1;
TAOS_FIELD *fields = NULL;
int32_t dstColsNum = pStream->dstCols;
int32_t fieldsNum = 0;
if(toAnother) {
//init hash
tbHash = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
fields = taos_fetch_fields(res);
fieldsNum = tscNumOfFields(pQueryInfo);
if(dstColsNum == -1)
dstColsNum = fieldsNum;
//search split column
char *split = "tbname"; // default
if(pStream->split)
split = pStream->split;
for(int32_t i = 1; i < fieldsNum; i++ ) {
if(strcasecmp(fields[i].name, split) == 0 ) {
colIdx = i;
break;
}
}
// set default with last fields if
if(colIdx == -1) {
colIdx = fieldsNum - 1;
}
}
// save rows
for(int32_t i = 0; i < numOfRows; ++i) { for(int32_t i = 0; i < numOfRows; ++i) {
TAOS_ROW row = taos_fetch_row(res); TAOS_ROW row = taos_fetch_row(res);
if (row != NULL) { if (row != NULL) {
tscDebug("0x%"PRIx64" stream:%p fetch result", pSql->self, pStream); tscDebug("0x%"PRIx64" stream:%p fetch result row=%d", pSql->self, pStream, i);
tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]); tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]);
pStream->stime = *(TSKEY *)row[0]; pStream->stime = *(TSKEY *)row[0];
// user callback function // write to another table if true
(*pStream->fp)(pStream->param, res, row); if(toAnother) {
tbHashAdd(tbHash, row, fields, colIdx, dstColsNum);
if(i == numOfRows - 1) //write last row to record last query time avoid query from begin for each
(*pStream->fp)(pStream->param, res, row);
} else {
(*pStream->fp)(pStream->param, res, row);
}
pStream->numOfRes++; pStream->numOfRes++;
} }
} }
// write Another
if(toAnother) {
toAnotherTable(pSql->pTscObj, pStream->to, fields, dstColsNum, tbHash, numOfRows);
taosHashCleanup(tbHash);
}
if (!pStream->isProject) { if (!pStream->isProject) {
pStream->stime = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision); pStream->stime = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
} }
int32_t code = TSDB_CODE_SUCCESS;
if(pQueryInfo && pQueryInfo->pQInfo)
code = pQueryInfo->pQInfo->code;
// actually only one row is returned. this following is not necessary // actually only one row is returned. this following is not necessary
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); if(code == TSDB_CODE_SUCCESS) {
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
}
} else { // numOfRows == 0, all data has been retrieved } else { // numOfRows == 0, all data has been retrieved
pStream->useconds += pSql->res.useconds; pStream->useconds += pSql->res.useconds;
if (pStream->numOfRes == 0) { if (pStream->numOfRes == 0) {
...@@ -313,7 +592,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -313,7 +592,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
// todo set retry dynamic time // todo set retry dynamic time
int32_t retry = tsProjectExecInterval; int32_t retry = tsProjectExecInterval;
tscError("0x%"PRIx64" stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql->self, pStream, numOfRows, retry); tscError("0x%"PRIx64" stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql->self, pStream, numOfRows, retry);
tscSetRetryTimer(pStream, pStream->pSql, retry); tscSetRetryTimer(pStream, pStream->pSql, retry);
return; return;
} }
...@@ -378,28 +656,33 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) ...@@ -378,28 +656,33 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
taosTmrReset(tscProcessStreamTimer, (int32_t)timer, pStream, tscTmr, &pStream->pTimer); taosTmrReset(tscProcessStreamTimer, (int32_t)timer, pStream, tscTmr, &pStream->pTimer);
} }
// get need delay time for every launch to exeucte query, include first and next launch
static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
// step 1 read setting delay time in taos.cfg
int64_t maxDelay = convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision); int64_t maxDelay = convertTimePrecision(tsMaxStreamComputDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
int64_t ratioDelay = maxDelay;
int64_t delayDelta = maxDelay;
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
delayDelta = (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio); ratioDelay= (int64_t)(pStream->interval.sliding * tsStreamComputDelayRatio);
if (delayDelta > maxDelay) { if (ratioDelay > maxDelay) {
delayDelta = maxDelay; ratioDelay = maxDelay;
} }
int64_t remainTimeWindow = pStream->interval.sliding - delayDelta; int64_t remainTimeWindow = pStream->interval.sliding - ratioDelay;
if (maxDelay > remainTimeWindow) { if (maxDelay > remainTimeWindow) {
maxDelay = (int64_t)(remainTimeWindow / 1.5f); maxDelay = (int64_t)(remainTimeWindow / 1.5f);
} }
} }
int64_t currentDelay = (rand() % maxDelay); // a random number // PART 2 calc allDelay = rand delay + fixed delay
currentDelay += delayDelta; int64_t allDelay = (rand() % maxDelay) + ratioDelay;
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
assert(currentDelay < pStream->interval.sliding); if(allDelay >= pStream->interval.sliding) {
tscWarn("CQ delay >= sliding error. delay=%" PRId64 " sliding=%" PRId64 ". so set delay=sliding/2.", allDelay, pStream->interval.sliding);
allDelay = pStream->interval.sliding / 2;
}
} }
return currentDelay; tscDebug("getLanchDelay allDelay=%" PRId64 "(ratioDelay=%" PRId64 ")", allDelay, ratioDelay);
return allDelay;
} }
...@@ -424,8 +707,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { ...@@ -424,8 +707,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
return; return;
} }
} else { } else {
int64_t stime = taosTimeTruncate(pStream->stime - 1, &pStream->interval, pStream->precision); int64_t tsc_stime = taosTimeTruncate(pStream->stime - 1, &pStream->interval, pStream->precision);
if (stime >= pStream->etime) { if (tsc_stime >= pStream->etime) {
tscDebug("0x%"PRIx64" stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql->self, pStream, tscDebug("0x%"PRIx64" stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql->self, pStream,
pStream->stime, pStream->etime); pStream->stime, pStream->etime);
// TODO : How to terminate stream here // TODO : How to terminate stream here
...@@ -448,7 +731,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { ...@@ -448,7 +731,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
timer += getLaunchTimeDelay(pStream); timer += getLaunchTimeDelay(pStream);
timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI); timer = convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
tscSetRetryTimer(pStream, pSql, timer); tscSetRetryTimer(pStream, pSql, timer);
} }
...@@ -505,7 +787,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { ...@@ -505,7 +787,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t tsc_stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pStream->isProject) { if (pStream->isProject) {
...@@ -513,43 +795,45 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in ...@@ -513,43 +795,45 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
pStream->interval.interval = tsProjectExecInterval; pStream->interval.interval = tsProjectExecInterval;
pStream->interval.sliding = tsProjectExecInterval; pStream->interval.sliding = tsProjectExecInterval;
if (stime != INT64_MIN) { // first projection start from the latest event timestamp if (tsc_stime != INT64_MIN) { // first projection start from the latest event timestamp
assert(stime >= pQueryInfo->window.skey); assert(tsc_stime >= pQueryInfo->window.skey);
stime += 1; // exclude the last records from table tsc_stime += 1; // exclude the last records from table
} else { } else {
stime = pQueryInfo->window.skey; tsc_stime = pQueryInfo->window.skey;
} }
} else { // timewindow based aggregation stream } else { // timewindow based aggregation stream
if (stime == INT64_MIN) { // no data in meter till now if (tsc_stime == INT64_MIN) { // no data in meter till now
if (pQueryInfo->window.skey != INT64_MIN) { if (pQueryInfo->window.skey != INT64_MIN) {
stime = pQueryInfo->window.skey; tsc_stime = pQueryInfo->window.skey;
} else { } else {
return stime; return tsc_stime;
} }
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); tsc_stime = taosTimeTruncate(tsc_stime, &pStream->interval, pStream->precision);
} else { } else {
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision); int64_t newStime = taosTimeTruncate(tsc_stime, &pStream->interval, pStream->precision);
if (newStime != stime) { if (newStime != tsc_stime) {
tscWarn("0x%"PRIx64" stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql->self, pStream, stime, newStime); tscWarn("0x%"PRIx64" stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql->self, pStream, tsc_stime, newStime);
stime = newStime; tsc_stime = newStime;
} }
} }
} }
return stime; return tsc_stime;
} }
static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { static int64_t tscGetFirstLaunchTime(const SSqlStream *pStream) {
// PART 1 now to stime span
int64_t timer = 0, now = taosGetTimestamp(pStream->precision); int64_t timer = 0, now = taosGetTimestamp(pStream->precision);
if (pStream->stime > now) { if (pStream->stime > now) {
timer = pStream->stime - now; timer = pStream->stime - now;
} }
int64_t startDelay = convertTimePrecision(tsStreamCompStartDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision); // PART 2 stream first Launch need delay, setting with taos.cfg
timer += convertTimePrecision(tsFirstLaunchDelay, TSDB_TIME_PRECISION_MILLI, pStream->precision);
// PART 3 every launch need delay, include first and next launch
timer += getLaunchTimeDelay(pStream); timer += getLaunchTimeDelay(pStream);
timer += startDelay;
return convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI); return convertTimePrecision(timer, pStream->precision, TSDB_TIME_PRECISION_MILLI);
} }
...@@ -562,6 +846,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -562,6 +846,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code; pSql->res.code = code;
tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code)); tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL); pStream->fp(pStream->param, NULL, NULL);
return; return;
} }
...@@ -588,17 +873,17 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -588,17 +873,17 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
// set stime with ltime if ltime > stime // set stime with ltime if ltime > stime
const char* dstTable = pStream->dstTable? pStream->dstTable: ""; const char* dstTable = pStream->dstTable? pStream->dstTable: "";
tscDebug("0x%"PRIx64" CQ table %s ltime is %"PRId64, pSql->self, dstTable, pStream->ltime); tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime);
if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) { if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) {
tscWarn("0x%"PRIx64" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime > 0", pSql->self, dstTable, pStream->stime, pStream->ltime); tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime);
pStream->stime = pStream->ltime; pStream->stime = pStream->ltime;
} }
int64_t starttime = tscGetLaunchTimestamp(pStream); int64_t starttime = tscGetFirstLaunchTime(pStream);
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
tscAddIntoStreamList(pStream); tscAddIntoStreamList(pStream);
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer); taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self, tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self,
...@@ -665,10 +950,54 @@ void cbParseSql(void* param, TAOS_RES* res, int code) { ...@@ -665,10 +950,54 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
char sql[128] = ""; char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", pStream->dstTable); sprintf(sql, "select last_row(*) from %s;", pStream->dstTable);
taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param); taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
return ;
}
void splitStreamSql(const char *str, char **sql, char **to, char **split) {
// OLD FORMAT only sql
if(strncmp(str, LABEL_SQL, LABEL_SQL_LEN) != 0) {
*sql = tmalloc(strlen(str) + 1);
if(*sql == NULL)
return ;
strcpy(*sql, str);
return ;
}
// NEW FORMAT sql:...to:...split:...
char *p1 = strstr(str + LABEL_SQL_LEN, LABEL_TO);
if(p1 == NULL) {
char *p = (char *)str + LABEL_SQL_LEN;
*sql = (char *)tmalloc(strlen(p) + 1);
strcpy(*sql, p);
return ;
}
// SQL value
int32_t len = (int32_t)(p1 - str - LABEL_SQL_LEN);
*sql = (char *)tmalloc(len + 1);
strncpy(*sql, str + LABEL_SQL_LEN, len);
(*sql)[len] = 0; // str end
// TO value
char *p2 = strstr(p1 + LABEL_TO_LEN, LABEL_SPLIT);
if(p2 == NULL) {
char *p = p1 + LABEL_TO_LEN;
*to = (char *)tmalloc(strlen(p) + 1);
strcpy(*to, p);
return ;
}
len = (int32_t)(p2 - p1 - LABEL_TO_LEN);
*to = (char *)tmalloc(len + 1);
strncpy(*to, p1 + LABEL_TO_LEN, len);
(*to)[len] = 0; // str end
// SPLIT value
char *p = p2 + LABEL_SPLIT_LEN;
*split = (char *)tmalloc(strlen(p) + 1);
strcpy(*split, p);
} }
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW), TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, int32_t dstCols, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) { int64_t tsc_stime, void *param, void (*callback)(void *), void* cqhandle) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL; if (pObj == NULL || pObj->signature != pObj) return NULL;
...@@ -697,19 +1026,26 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -697,19 +1026,26 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
} }
pStream->ltime = INT64_MIN; pStream->ltime = INT64_MIN;
pStream->stime = stime; pStream->stime = tsc_stime;
pStream->fp = fp; pStream->fp = fp;
pStream->callback = callback; pStream->callback = callback;
pStream->param = param; pStream->param = param;
pStream->pSql = pSql; pStream->pSql = pSql;
pStream->cqhandle = cqhandle; pStream->cqhandle = cqhandle;
pStream->dstCols = dstCols;
pStream->to = NULL;
pStream->split = NULL;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
tscSetStreamDestTable(pStream, dstTable); tscSetStreamDestTable(pStream, dstTable);
pSql->pStream = pStream; pSql->pStream = pStream;
pSql->param = pStream; pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); // split stream sqlstr to sql,to,split
splitStreamSql(sqlstr, &pSql->sqlstr, &pStream->to, &pStream->split);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self); tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
...@@ -717,7 +1053,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -717,7 +1053,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return NULL; return NULL;
} }
strtolower(pSql->sqlstr, sqlstr); strtolower(pSql->sqlstr, pSql->sqlstr);
pSql->fp = tscCreateStream; pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream; pSql->fetchFp = tscCreateStream;
pSql->cmd.resColumnId = TSDB_RES_COL_ID; pSql->cmd.resColumnId = TSDB_RES_COL_ID;
...@@ -729,13 +1065,14 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -729,13 +1065,14 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql->fp = cbParseSql; pSql->fp = cbParseSql;
pSql->fetchFp = cbParseSql; pSql->fetchFp = cbParseSql;
registerSqlObj(pSql); registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
cbParseSql(pStream, pSql, code); cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self); tscDebug(" CQ taos_open_stream IN Process. sql=%s", sqlstr);
} else { } else {
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
...@@ -746,9 +1083,9 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -746,9 +1083,9 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return pStream; return pStream;
} }
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW), TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) { int64_t tsc_stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL); return taos_open_stream_withname(taos, "", -1, sqlstr, fp, tsc_stime, param, callback, NULL);
} }
void taos_close_stream(TAOS_STREAM *handle) { void taos_close_stream(TAOS_STREAM *handle) {
...@@ -774,6 +1111,16 @@ void taos_close_stream(TAOS_STREAM *handle) { ...@@ -774,6 +1111,16 @@ void taos_close_stream(TAOS_STREAM *handle) {
pStream->fp(pStream->param, NULL, NULL); pStream->fp(pStream->param, NULL, NULL);
taos_free_result(pSql); taos_free_result(pSql);
// free malloc
if(pStream->to) {
tfree(pStream->to);
pStream->to = NULL;
}
if(pStream->split) {
tfree(pStream->split);
pStream->split = NULL;
}
tfree(pStream); tfree(pStream);
} }
} }
...@@ -83,7 +83,7 @@ extern int32_t tsMaxNumOfOrderedResults; ...@@ -83,7 +83,7 @@ extern int32_t tsMaxNumOfOrderedResults;
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime; extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay; extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay; extern int32_t tsFirstLaunchDelay;
extern int32_t tsRetryStreamCompDelay; extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval; extern int32_t tsProjectExecInterval;
......
...@@ -107,11 +107,11 @@ int32_t tsMinIntervalTime = 1; ...@@ -107,11 +107,11 @@ int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly // 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000; int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly // 10sec, the stream first launched to execute delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000; int32_t tsFirstLaunchDelay = 10000;
// the stream computing delay time after executing failed, change accordingly // the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000; int32_t tsRetryStreamCompDelay = 30 * 60 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default. // The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f; float tsStreamComputDelayRatio = 0.1f;
...@@ -798,7 +798,7 @@ static void doInitGlobalConfig(void) { ...@@ -798,7 +798,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "maxFirstStreamCompDelay"; cfg.option = "maxFirstStreamCompDelay";
cfg.ptr = &tsStreamCompStartDelay; cfg.ptr = &tsFirstLaunchDelay;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1000; cfg.minValue = 1000;
......
...@@ -423,7 +423,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { ...@@ -423,7 +423,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
} }
// inner implement in tscStream.c // inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, int32_t dstCols, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *), void* cqhandle); int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
...@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { ...@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
} }
pObj->tmrId = 0; pObj->tmrId = 0;
int32_t dstCols = -1;
if(pObj->pSchema)
dstCols = pObj->pSchema->numOfCols;
if (pObj->pStream == NULL) { if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \ pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, dstCols, pObj->sqlStr, cqProcessStreamRes, \
INT64_MIN, (void *)pObj->rid, NULL, pContext); INT64_MIN, (void *)pObj->rid, NULL, pContext);
// TODO the pObj->pStream may be released if error happens // TODO the pObj->pStream may be released if error happens
......
...@@ -173,7 +173,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res); ...@@ -173,7 +173,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
// row to string
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota);
// one field to string
DLL_EXPORT int taos_print_field(char *str, void* value, TAOS_FIELD *field);
DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
......
...@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); ...@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -136,86 +136,88 @@ ...@@ -136,86 +136,88 @@
#define TK_UNSIGNED 118 #define TK_UNSIGNED 118
#define TK_TAGS 119 #define TK_TAGS 119
#define TK_USING 120 #define TK_USING 120
#define TK_NULL 121 #define TK_TO 121
#define TK_NOW 122 #define TK_SPLIT 122
#define TK_VARIABLE 123 #define TK_NULL 123
#define TK_SELECT 124 #define TK_NOW 124
#define TK_UNION 125 #define TK_VARIABLE 125
#define TK_ALL 126 #define TK_SELECT 126
#define TK_DISTINCT 127 #define TK_UNION 127
#define TK_FROM 128 #define TK_ALL 128
#define TK_RANGE 129 #define TK_DISTINCT 129
#define TK_INTERVAL 130 #define TK_FROM 130
#define TK_EVERY 131 #define TK_RANGE 131
#define TK_SESSION 132 #define TK_INTERVAL 132
#define TK_STATE_WINDOW 133 #define TK_EVERY 133
#define TK_FILL 134 #define TK_SESSION 134
#define TK_SLIDING 135 #define TK_STATE_WINDOW 135
#define TK_ORDER 136 #define TK_FILL 136
#define TK_BY 137 #define TK_SLIDING 137
#define TK_ASC 138 #define TK_ORDER 138
#define TK_GROUP 139 #define TK_BY 139
#define TK_HAVING 140 #define TK_ASC 140
#define TK_LIMIT 141 #define TK_GROUP 141
#define TK_OFFSET 142 #define TK_HAVING 142
#define TK_SLIMIT 143 #define TK_LIMIT 143
#define TK_SOFFSET 144 #define TK_OFFSET 144
#define TK_WHERE 145 #define TK_SLIMIT 145
#define TK_RESET 146 #define TK_SOFFSET 146
#define TK_QUERY 147 #define TK_WHERE 147
#define TK_SYNCDB 148 #define TK_RESET 148
#define TK_ADD 149 #define TK_QUERY 149
#define TK_COLUMN 150 #define TK_SYNCDB 150
#define TK_MODIFY 151 #define TK_ADD 151
#define TK_TAG 152 #define TK_COLUMN 152
#define TK_CHANGE 153 #define TK_MODIFY 153
#define TK_SET 154 #define TK_TAG 154
#define TK_KILL 155 #define TK_CHANGE 155
#define TK_CONNECTION 156 #define TK_SET 156
#define TK_STREAM 157 #define TK_KILL 157
#define TK_COLON 158 #define TK_CONNECTION 158
#define TK_ABORT 159 #define TK_STREAM 159
#define TK_AFTER 160 #define TK_COLON 160
#define TK_ATTACH 161 #define TK_ABORT 161
#define TK_BEFORE 162 #define TK_AFTER 162
#define TK_BEGIN 163 #define TK_ATTACH 163
#define TK_CASCADE 164 #define TK_BEFORE 164
#define TK_CLUSTER 165 #define TK_BEGIN 165
#define TK_CONFLICT 166 #define TK_CASCADE 166
#define TK_COPY 167 #define TK_CLUSTER 167
#define TK_DEFERRED 168 #define TK_CONFLICT 168
#define TK_DELIMITERS 169 #define TK_COPY 169
#define TK_DETACH 170 #define TK_DEFERRED 170
#define TK_EACH 171 #define TK_DELIMITERS 171
#define TK_END 172 #define TK_DETACH 172
#define TK_EXPLAIN 173 #define TK_EACH 173
#define TK_FAIL 174 #define TK_END 174
#define TK_FOR 175 #define TK_EXPLAIN 175
#define TK_IGNORE 176 #define TK_FAIL 176
#define TK_IMMEDIATE 177 #define TK_FOR 177
#define TK_INITIALLY 178 #define TK_IGNORE 178
#define TK_INSTEAD 179 #define TK_IMMEDIATE 179
#define TK_KEY 180 #define TK_INITIALLY 180
#define TK_OF 181 #define TK_INSTEAD 181
#define TK_RAISE 182 #define TK_KEY 182
#define TK_REPLACE 183 #define TK_OF 183
#define TK_RESTRICT 184 #define TK_RAISE 184
#define TK_ROW 185 #define TK_REPLACE 185
#define TK_STATEMENT 186 #define TK_RESTRICT 186
#define TK_TRIGGER 187 #define TK_ROW 187
#define TK_VIEW 188 #define TK_STATEMENT 188
#define TK_IPTOKEN 189 #define TK_TRIGGER 189
#define TK_SEMI 190 #define TK_VIEW 190
#define TK_NONE 191 #define TK_IPTOKEN 191
#define TK_PREV 192 #define TK_SEMI 192
#define TK_LINEAR 193 #define TK_NONE 193
#define TK_IMPORT 194 #define TK_PREV 194
#define TK_TBNAME 195 #define TK_LINEAR 195
#define TK_JOIN 196 #define TK_IMPORT 196
#define TK_INSERT 197 #define TK_TBNAME 197
#define TK_INTO 198 #define TK_JOIN 198
#define TK_VALUES 199 #define TK_INSERT 199
#define TK_FILE 200 #define TK_INTO 200
#define TK_VALUES 201
#define TK_FILE 202
#define TK_SPACE 300 #define TK_SPACE 300
......
...@@ -851,15 +851,15 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -851,15 +851,15 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len)); memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
code = mnodeValidateCreateTableMsg(p, pSubMsg); code = mnodeValidateCreateTableMsg(p, pSubMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) { if (code == TSDB_CODE_SUCCESS || ( p->igExists == 1 && code == TSDB_CODE_MND_TABLE_ALREADY_EXIST )) {
++pSubMsg->pBatchMasterMsg->successed; ++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg); mnodeDestroySubMsg(pSubMsg);
continue; continue;
} }
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pSubMsg); mnodeDestroySubMsg(pSubMsg);
return code; return code;
} }
} }
...@@ -1046,11 +1046,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1046,11 +1046,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid); mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid);
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->successed ++;
} else { } else {
mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
tstrerror(code)); tstrerror(code));
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb}; SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->received ++;
}
// if super table create by batch msg, check done and send finished to client
if(pMsg->pBatchMasterMsg) {
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected)
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
} }
return code; return code;
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
#include "tstrbuild.h" #include "tstrbuild.h"
#include "ttoken.h" #include "ttoken.h"
#include "tvariant.h" #include "tvariant.h"
#include "tname.h"
#define ParseTOKENTYPE SStrToken #define ParseTOKENTYPE SStrToken
...@@ -156,8 +157,11 @@ typedef struct SCreatedTableInfo { ...@@ -156,8 +157,11 @@ typedef struct SCreatedTableInfo {
typedef struct SCreateTableSql { typedef struct SCreateTableSql {
SStrToken name; // table name, create table [name] xxx SStrToken name; // table name, create table [name] xxx
SStrToken to; // create stream to anohter table
SStrToken split; // split columns
int8_t type; // create normal table/from super table/ stream int8_t type; // create normal table/from super table/ stream
bool existCheck; bool existCheck;
SName toSName;
struct { struct {
SArray *pTagColumns; // SArray<TAOS_FIELD> SArray *pTagColumns; // SArray<TAOS_FIELD>
...@@ -334,6 +338,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode); ...@@ -334,6 +338,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode);
SArray *appendSelectClause(SArray *pList, void *pSubclause); SArray *appendSelectClause(SArray *pList, void *pSubclause);
void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists); void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists);
void setCreatedStreamOpt(SSqlInfo *pInfo, SStrToken *pTo, SStrToken *pSplit);
void SqlInfoDestroy(SSqlInfo *pInfo); void SqlInfoDestroy(SSqlInfo *pInfo);
......
...@@ -411,14 +411,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt ...@@ -411,14 +411,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt
// create stream // create stream
// create table table_name as select count(*) from super_table_name interval(time) // create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). { create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) to_opt(E) split_opt(F) AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM); A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setCreatedStreamOpt(pInfo, &E, &F);
V.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &V, &U); setCreatedTableName(pInfo, &V, &U);
} }
// to_opt
%type to_opt {SStrToken}
to_opt(A) ::= . {A.n = 0;}
to_opt(A) ::= TO ids(X) cpxName(Y). {
A = X;
A.n += Y.n;
}
// split_opt
%type to_split {SStrToken}
split_opt(A) ::= . {A.n = 0;}
split_opt(A) ::= SPLIT ids(X). { A = X;}
%type column{TAOS_FIELD} %type column{TAOS_FIELD}
%type columnlist{SArray*} %type columnlist{SArray*}
%destructor columnlist {taosArrayDestroy(&$$);} %destructor columnlist {taosArrayDestroy(&$$);}
......
...@@ -1244,6 +1244,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken ...@@ -1244,6 +1244,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
pInfo->pCreateTableInfo->existCheck = (pIfNotExists->n != 0); pInfo->pCreateTableInfo->existCheck = (pIfNotExists->n != 0);
} }
void setCreatedStreamOpt(SSqlInfo *pInfo, SStrToken *pTo, SStrToken *pSplit) {
pInfo->pCreateTableInfo->to = *pTo;
pInfo->pCreateTableInfo->split = *pSplit;
}
void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) { void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) {
pInfo->type = type; pInfo->type = type;
if (nParam == 0) { if (nParam == 0) {
......
...@@ -225,51 +225,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -225,51 +225,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code; return code;
} }
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
pos += 3;
ms = atoi(pos);
while(*pos >= '0' && *pos <= '9'){
pos ++;
}
char unit_char = *pos;
if(unit_char == 'h'){
ms *= 3600*1000;
} else if(unit_char == 'm'){
ms *= 60*1000;
} else if(unit_char == 's'){
ms *= 1000;
}
}
if(ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if(ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while(used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isQueryKilled(pQInfo)){
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo); assert(pQInfo && pQInfo->signature == pQInfo);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -1677,3 +1677,9 @@ static void rpcDecRef(SRpcInfo *pRpc) ...@@ -1677,3 +1677,9 @@ static void rpcDecRef(SRpcInfo *pRpc)
} }
} }
int32_t rpcUnusedSession(void * rpcInfo, bool bLock) {
SRpcInfo *info = (SRpcInfo *)rpcInfo;
if(info == NULL)
return 0;
return taosIdPoolNumOfFree(info->idPool, bLock);
}
\ No newline at end of file
...@@ -36,6 +36,9 @@ int taosIdPoolNumOfUsed(void *handle); ...@@ -36,6 +36,9 @@ int taosIdPoolNumOfUsed(void *handle);
bool taosIdPoolMarkStatus(void *handle, int id); bool taosIdPoolMarkStatus(void *handle, int id);
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType); ...@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
*/ */
SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr); SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes, not include '\0'
*/
int32_t tStrNCpy(char *dst, SStrToken* srcToken);
/** /**
* check if it is a keyword or not * check if it is a keyword or not
* @param z * @param z
......
...@@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) { ...@@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) {
pthread_mutex_unlock(&pIdPool->mutex); pthread_mutex_unlock(&pIdPool->mutex);
return ret; return ret;
}
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock) {
id_pool_t *pIdPool = handle;
if(bLock)
pthread_mutex_lock(&pIdPool->mutex);
int ret = pIdPool->numOfFree;
if(bLock)
pthread_mutex_unlock(&pIdPool->mutex);
return ret;
} }
\ No newline at end of file
...@@ -231,7 +231,9 @@ static SKeyword keywordTable[] = { ...@@ -231,7 +231,9 @@ static SKeyword keywordTable[] = {
{"AGGREGATE", TK_AGGREGATE}, {"AGGREGATE", TK_AGGREGATE},
{"BUFSIZE", TK_BUFSIZE}, {"BUFSIZE", TK_BUFSIZE},
{"RANGE", TK_RANGE}, {"RANGE", TK_RANGE},
{"CONTAINS", TK_CONTAINS} {"CONTAINS", TK_CONTAINS},
{"TO", TK_TO},
{"SPLIT", TK_SPLIT}
}; };
static const char isIdChar[] = { static const char isIdChar[] = {
...@@ -704,6 +706,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) { ...@@ -704,6 +706,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
return t0; return t0;
} }
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes
*/
int32_t tStrNCpy(char *dst, SStrToken* srcToken) {
strncpy(dst, srcToken->z, srcToken->n);
return srcToken->n;
}
bool taosIsKeyWordToken(const char* z, int32_t len) { bool taosIsKeyWordToken(const char* z, int32_t len) {
return (tKeywordCode((char*)z, len) != TK_ID); return (tKeywordCode((char*)z, len) != TK_ID);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册