提交 7822d070 编写于 作者: A Alex Duan

[TS-1164]<fix>(query): develop->CQ Support write to super table

上级 0efc916e
......@@ -29,6 +29,14 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
#define LABEL_SQL "sql:"
#define LABEL_TO " to:"
#define LABEL_SPLIT " split:"
#define LABEL_SQL_LEN (sizeof(LABEL_SQL) - 1)
#define LABEL_TO_LEN (sizeof(LABEL_TO) - 1)
#define LABEL_SPLIT_LEN (sizeof(LABEL_SPLIT) - 1)
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
......
......@@ -405,6 +405,10 @@ typedef struct SSqlStream {
int16_t precision;
int64_t num; // number of computing count
int32_t dstCols; // dstTable has number of columns
char* to;
char* split;
/*
* keep the number of current result in computing,
* the value will be set to 0 before set timer for next computing
......@@ -484,6 +488,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, TAOS **taos);
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res);
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param);
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
......
......@@ -53,3 +53,4 @@ taos_is_null
taos_insert_lines
taos_schemaless_insert
taos_result_block
taos_print_row_ex
\ No newline at end of file
......@@ -1443,6 +1443,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql,
const char* msg3 = "no acctId";
const char* msg4 = "db name too long";
const char* msg5 = "table name too long";
const char* msg6 = "table name empty";
SSqlCmd* pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -1494,6 +1495,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql,
if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if(pTableName->n == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
char name[TSDB_TABLE_FNAME_LEN] = {0};
......@@ -8840,6 +8843,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
if (tscValidateName(pName, true, &dbIncluded1) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
// check to valid and create to name
if(pInfo->pCreateTableInfo->to.n > 0) {
bool dbInclude = false;
if (tscValidateName(&pInfo->pCreateTableInfo->to, false, &dbInclude) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
int32_t code = tscSetTableFullName(&pInfo->pCreateTableInfo->toSName, &pInfo->pCreateTableInfo->to, pSql, dbInclude);
if(code != TSDB_CODE_SUCCESS) {
return code;
}
}
SRelationInfo* pFromInfo = pInfo->pCreateTableInfo->pSelect->from;
if (pFromInfo == NULL || taosArrayGetSize(pFromInfo->list) == 0) {
......
......@@ -1546,11 +1546,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pCreateTableInfo->pSelect != NULL) {
size += (pCreateTableInfo->pSelect->sqlstr.n + 1);
// add size = create super table with same columns and 1 tags
if(pCreateTableInfo->to.n > 0) {
size += sizeof(SCreateTableMsg);
size += sizeof(SSchema) * (pCmd->numOfCols + 1);
size += pCreateTableInfo->to.n + 4; // to:
if(pCreateTableInfo->split.n > 0)
size += pCreateTableInfo->split.n + 7; // split:
}
}
return size + TSDB_EXTRA_PAYLOAD_SIZE;
}
char* fillCreateSTableMsg(SCreateTableMsg* pCreateMsg, SCreateTableSql* pTableSql, SSqlCmd* pCmd, SSqlInfo *pInfo) {
// SET
SSchema* pSchema = (SSchema *)pCreateMsg->schema;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pCreateMsg->igExists = 0;
pCreateMsg->sqlLen = 0;
// FullName like acctID.dbName.tableName
tNameExtractFullName(&pInfo->pCreateTableInfo->toSName, pCreateMsg->tableName);
// copy columns
pCreateMsg->numOfColumns = htons(pCmd->numOfCols);
for (int i = 0; i < pCmd->numOfCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type;
strcpy(pSchema->name, pField->name);
pSchema->bytes = htons(pField->bytes);
pSchema++;
}
// append one tag
pCreateMsg->numOfTags = htons(1); // only one tag immutable
pSchema->type = TSDB_DATA_TYPE_INT;
pSchema->bytes = htons(INT_BYTES);
strcpy(pSchema->name, "tag1");
pSchema ++;
return (char *)pSchema;
}
int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int msgLen = 0;
int size = 0;
......@@ -1608,39 +1646,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCreate->len = htonl(len);
}
} else { // create (super) table
// FIRST MSG
SCreateTableMsg* pCreate = pCreateMsg;
pCreateTableMsg->numOfTables = htonl(1); // only one table will be created
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreateMsg->tableName);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreate->tableName);
bool to = pInfo->pCreateTableInfo->to.n > 0;
assert(code == 0);
SCreateTableSql *pCreateTable = pInfo->pCreateTableInfo;
pCreateMsg->igExists = pCreateTable->existCheck ? 1 : 0;
pCreateMsg->numOfColumns = htons(pCmd->numOfCols);
pCreateMsg->numOfTags = htons(pCmd->count);
pCreateMsg->sqlLen = 0;
pMsg = (char *)pCreateMsg->schema;
pSchema = (SSchema *)pCreateMsg->schema;
pCreate->igExists = pCreateTable->existCheck ? 1 : 0;
pCreate->numOfColumns = htons(pCmd->numOfCols);
pCreate->numOfTags = htons(pCmd->count);
pCreate->sqlLen = 0;
pSchema = (SSchema *)pCreate->schema;
//copy schema
for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pSchema->type = pField->type;
strcpy(pSchema->name, pField->name);
pSchema->bytes = htons(pField->bytes);
pSchema++;
}
//copy stream sql if have
pMsg = (char *)pSchema;
if (type == TSQL_CREATE_STREAM) { // check if it is a stream sql
SSqlNode *pQuerySql = pInfo->pCreateTableInfo->pSelect;
int16_t len = 0;
if(to) {
//sql:
strcpy(pMsg, LABEL_SQL);
len += LABEL_SQL_LEN;
len += tStrNCpy(pMsg + len, &pQuerySql->sqlstr);
//to
strcpy(pMsg + len, LABEL_TO);
len += LABEL_TO_LEN;
len += tStrNCpy(pMsg + len, &pInfo->pCreateTableInfo->to);
//split if
if(pInfo->pCreateTableInfo->split.n > 0) {
strcpy(pMsg + len, LABEL_SPLIT);
len += LABEL_SPLIT_LEN;
len += tStrNCpy(pMsg + len, &pInfo->pCreateTableInfo->split);
}
// append string end flag
pMsg[len++] = 0;
pMsg += len;
pCreate->sqlLen = htons(len);
} else {
len = pQuerySql->sqlstr.n;
strncpy(pMsg, pQuerySql->sqlstr.z, len);
pMsg[len++] = 0; // string end
pMsg += len;
pCreate->sqlLen = htons(len);
}
}
// calc first msg length
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
strncpy(pMsg, pQuerySql->sqlstr.z, pQuerySql->sqlstr.n + 1);
pCreateMsg->sqlLen = htons(pQuerySql->sqlstr.n + 1);
pMsg += pQuerySql->sqlstr.n + 1;
// filling second msg if to have value
if(to) {
pCreate = (SCreateTableMsg *)pMsg;
pMsg = fillCreateSTableMsg(pCreate, pCreateTable, pCmd, pInfo);
len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
pCreateTableMsg->numOfTables = htonl(2);
}
}
......
......@@ -28,6 +28,7 @@
#include "tutil.h"
#include "ttimer.h"
#include "tscProfile.h"
#include "tidpool.h"
static char clusterDefaultId[] = "clusterDefaultId";
static bool validImpl(const char* str, size_t maxsize) {
......@@ -307,6 +308,25 @@ void taos_close(TAOS *taos) {
taosRemoveRef(tscRefId, pObj->rid);
}
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos) {
// param valid check
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("pObj:%p is NULL or freed", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
if(pObj->pRpcObj == NULL ) {
tscError("pObj:%p pRpcObj is NULL.", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
// get number
return rpcUnusedSession(pObj->pRpcObj->pDnodeConn, false);
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(tres != NULL);
......@@ -807,13 +827,16 @@ bool taos_is_update_query(TAOS_RES *res) {
SSqlCmd* pCmd = &pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command);
}
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
return taos_print_row_ex(str, row, fields, num_fields, ' ', false);
}
int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota) {
int len = 0;
for (int i = 0; i < num_fields; ++i) {
if (i > 0) {
str[len++] = ' ';
str[len++] = split;
}
if (row[i] == NULL) {
......@@ -874,9 +897,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
} else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
}
// add pre quotaion if require
if(addQuota) {
*(str + len) = '\'';
len += 1;
}
// copy content
memcpy(str + len, row[i], charLen);
len += charLen;
// add end quotaion if require
if(addQuota) {
*(str + len)= '\'';
len += 1;
}
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
......@@ -893,6 +930,89 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
return len;
}
// print field value to str
int taos_print_field(char *str, void* value, TAOS_FIELD *field) {
// check valid
if (str == NULL || value == NULL || field == NULL) {
return 0;
}
// get value
int len = 0;
switch (field->type) {
//
// fixed length
//
case TSDB_DATA_TYPE_TINYINT:
len = sprintf(str, "%d", *((int8_t *)value));
break;
case TSDB_DATA_TYPE_UTINYINT:
len = sprintf(str, "%u", *((uint8_t *)value));
break;
case TSDB_DATA_TYPE_SMALLINT:
len = sprintf(str, "%d", *((int16_t *)value));
break;
case TSDB_DATA_TYPE_USMALLINT:
len = sprintf(str, "%u", *((uint16_t *)value));
break;
case TSDB_DATA_TYPE_INT:
len = sprintf(str, "%d", *((int32_t *)value));
break;
case TSDB_DATA_TYPE_UINT:
len = sprintf(str, "%u", *((uint32_t *)value));
break;
case TSDB_DATA_TYPE_BIGINT:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_UBIGINT:
len = sprintf(str, "%" PRIu64, *((uint64_t *)value));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(value);
len = sprintf(str, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(value);
len = sprintf(str, "%lf", dv);
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_BOOL:
len = sprintf(str, "%d", *((int8_t *)value));
//
// variant length
//
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
len = varDataLen((char*)value - VARSTR_HEADER_SIZE);
if (field->type == TSDB_DATA_TYPE_BINARY) {
assert(len <= field->bytes && len >= 0);
} else {
assert(len <= field->bytes * TSDB_NCHAR_SIZE && len >= 0);
}
memcpy(str, value, len);
} break;
default:
break;
}
return len;
}
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
......
此差异已折叠。
......@@ -83,7 +83,7 @@ extern int32_t tsMaxNumOfOrderedResults;
extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsFirstLaunchDelay;
extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int32_t tsProjectExecInterval;
......
......@@ -107,11 +107,11 @@ int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000;
// 10sec, the stream first launched to execute delay time after system launched successfully, changed accordingly
int32_t tsFirstLaunchDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000;
int32_t tsRetryStreamCompDelay = 30 * 60 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f;
......@@ -798,7 +798,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption(cfg);
cfg.option = "maxFirstStreamCompDelay";
cfg.ptr = &tsStreamCompStartDelay;
cfg.ptr = &tsFirstLaunchDelay;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1000;
......
......@@ -423,7 +423,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
}
// inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, int32_t dstCols, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
......@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
pObj->tmrId = 0;
int32_t dstCols = -1;
if(pObj->pSchema)
dstCols = pObj->pSchema->numOfCols;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, dstCols, pObj->sqlStr, cqProcessStreamRes, \
INT64_MIN, (void *)pObj->rid, NULL, pContext);
// TODO the pObj->pStream may be released if error happens
......
......@@ -173,7 +173,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
// row to string
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota);
// one field to string
DLL_EXPORT int taos_print_field(char *str, void* value, TAOS_FIELD *field);
DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
......
......@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
#ifdef __cplusplus
}
......
......@@ -136,86 +136,88 @@
#define TK_UNSIGNED 118
#define TK_TAGS 119
#define TK_USING 120
#define TK_NULL 121
#define TK_NOW 122
#define TK_VARIABLE 123
#define TK_SELECT 124
#define TK_UNION 125
#define TK_ALL 126
#define TK_DISTINCT 127
#define TK_FROM 128
#define TK_RANGE 129
#define TK_INTERVAL 130
#define TK_EVERY 131
#define TK_SESSION 132
#define TK_STATE_WINDOW 133
#define TK_FILL 134
#define TK_SLIDING 135
#define TK_ORDER 136
#define TK_BY 137
#define TK_ASC 138
#define TK_GROUP 139
#define TK_HAVING 140
#define TK_LIMIT 141
#define TK_OFFSET 142
#define TK_SLIMIT 143
#define TK_SOFFSET 144
#define TK_WHERE 145
#define TK_RESET 146
#define TK_QUERY 147
#define TK_SYNCDB 148
#define TK_ADD 149
#define TK_COLUMN 150
#define TK_MODIFY 151
#define TK_TAG 152
#define TK_CHANGE 153
#define TK_SET 154
#define TK_KILL 155
#define TK_CONNECTION 156
#define TK_STREAM 157
#define TK_COLON 158
#define TK_ABORT 159
#define TK_AFTER 160
#define TK_ATTACH 161
#define TK_BEFORE 162
#define TK_BEGIN 163
#define TK_CASCADE 164
#define TK_CLUSTER 165
#define TK_CONFLICT 166
#define TK_COPY 167
#define TK_DEFERRED 168
#define TK_DELIMITERS 169
#define TK_DETACH 170
#define TK_EACH 171
#define TK_END 172
#define TK_EXPLAIN 173
#define TK_FAIL 174
#define TK_FOR 175
#define TK_IGNORE 176
#define TK_IMMEDIATE 177
#define TK_INITIALLY 178
#define TK_INSTEAD 179
#define TK_KEY 180
#define TK_OF 181
#define TK_RAISE 182
#define TK_REPLACE 183
#define TK_RESTRICT 184
#define TK_ROW 185
#define TK_STATEMENT 186
#define TK_TRIGGER 187
#define TK_VIEW 188
#define TK_IPTOKEN 189
#define TK_SEMI 190
#define TK_NONE 191
#define TK_PREV 192
#define TK_LINEAR 193
#define TK_IMPORT 194
#define TK_TBNAME 195
#define TK_JOIN 196
#define TK_INSERT 197
#define TK_INTO 198
#define TK_VALUES 199
#define TK_FILE 200
#define TK_TO 121
#define TK_SPLIT 122
#define TK_NULL 123
#define TK_NOW 124
#define TK_VARIABLE 125
#define TK_SELECT 126
#define TK_UNION 127
#define TK_ALL 128
#define TK_DISTINCT 129
#define TK_FROM 130
#define TK_RANGE 131
#define TK_INTERVAL 132
#define TK_EVERY 133
#define TK_SESSION 134
#define TK_STATE_WINDOW 135
#define TK_FILL 136
#define TK_SLIDING 137
#define TK_ORDER 138
#define TK_BY 139
#define TK_ASC 140
#define TK_GROUP 141
#define TK_HAVING 142
#define TK_LIMIT 143
#define TK_OFFSET 144
#define TK_SLIMIT 145
#define TK_SOFFSET 146
#define TK_WHERE 147
#define TK_RESET 148
#define TK_QUERY 149
#define TK_SYNCDB 150
#define TK_ADD 151
#define TK_COLUMN 152
#define TK_MODIFY 153
#define TK_TAG 154
#define TK_CHANGE 155
#define TK_SET 156
#define TK_KILL 157
#define TK_CONNECTION 158
#define TK_STREAM 159
#define TK_COLON 160
#define TK_ABORT 161
#define TK_AFTER 162
#define TK_ATTACH 163
#define TK_BEFORE 164
#define TK_BEGIN 165
#define TK_CASCADE 166
#define TK_CLUSTER 167
#define TK_CONFLICT 168
#define TK_COPY 169
#define TK_DEFERRED 170
#define TK_DELIMITERS 171
#define TK_DETACH 172
#define TK_EACH 173
#define TK_END 174
#define TK_EXPLAIN 175
#define TK_FAIL 176
#define TK_FOR 177
#define TK_IGNORE 178
#define TK_IMMEDIATE 179
#define TK_INITIALLY 180
#define TK_INSTEAD 181
#define TK_KEY 182
#define TK_OF 183
#define TK_RAISE 184
#define TK_REPLACE 185
#define TK_RESTRICT 186
#define TK_ROW 187
#define TK_STATEMENT 188
#define TK_TRIGGER 189
#define TK_VIEW 190
#define TK_IPTOKEN 191
#define TK_SEMI 192
#define TK_NONE 193
#define TK_PREV 194
#define TK_LINEAR 195
#define TK_IMPORT 196
#define TK_TBNAME 197
#define TK_JOIN 198
#define TK_INSERT 199
#define TK_INTO 200
#define TK_VALUES 201
#define TK_FILE 202
#define TK_SPACE 300
......
......@@ -851,15 +851,15 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
code = mnodeValidateCreateTableMsg(p, pSubMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg);
continue;
if (code == TSDB_CODE_SUCCESS || ( p->igExists == 1 && code == TSDB_CODE_MND_TABLE_ALREADY_EXIST )) {
++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg);
continue;
}
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pSubMsg);
return code;
mnodeDestroySubMsg(pSubMsg);
return code;
}
}
......@@ -1046,11 +1046,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid);
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->successed ++;
} else {
mError("msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
tstrerror(code));
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .pTable = tsSuperTableSdb};
sdbDeleteRow(&desc);
if(pMsg->pBatchMasterMsg)
pMsg->pBatchMasterMsg->received ++;
}
// if super table create by batch msg, check done and send finished to client
if(pMsg->pBatchMasterMsg) {
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received >= pMsg->pBatchMasterMsg->expected)
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
return code;
......
......@@ -25,6 +25,7 @@ extern "C" {
#include "tstrbuild.h"
#include "ttoken.h"
#include "tvariant.h"
#include "tname.h"
#define ParseTOKENTYPE SStrToken
......@@ -156,8 +157,11 @@ typedef struct SCreatedTableInfo {
typedef struct SCreateTableSql {
SStrToken name; // table name, create table [name] xxx
SStrToken to; // create stream to anohter table
SStrToken split; // split columns
int8_t type; // create normal table/from super table/ stream
bool existCheck;
SName toSName;
struct {
SArray *pTagColumns; // SArray<TAOS_FIELD>
......@@ -334,6 +338,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode);
SArray *appendSelectClause(SArray *pList, void *pSubclause);
void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken *pIfNotExists);
void setCreatedStreamOpt(SSqlInfo *pInfo, SStrToken *pTo, SStrToken *pSplit);
void SqlInfoDestroy(SSqlInfo *pInfo);
......
......@@ -411,14 +411,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt
// create stream
// create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) to_opt(E) split_opt(F) AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setCreatedStreamOpt(pInfo, &E, &F);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
}
// to_opt
%type to_opt {SStrToken}
to_opt(A) ::= . {A.n = 0;}
to_opt(A) ::= TO ids(X) cpxName(Y). {
A = X;
A.n += Y.n;
}
// split_opt
%type to_split {SStrToken}
split_opt(A) ::= . {A.n = 0;}
split_opt(A) ::= SPLIT ids(X). { A = X;}
%type column{TAOS_FIELD}
%type columnlist{SArray*}
%destructor columnlist {taosArrayDestroy(&$$);}
......
......@@ -1244,6 +1244,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
pInfo->pCreateTableInfo->existCheck = (pIfNotExists->n != 0);
}
void setCreatedStreamOpt(SSqlInfo *pInfo, SStrToken *pTo, SStrToken *pSplit) {
pInfo->pCreateTableInfo->to = *pTo;
pInfo->pCreateTableInfo->split = *pSplit;
}
void setDCLSqlElems(SSqlInfo *pInfo, int32_t type, int32_t nParam, ...) {
pInfo->type = type;
if (nParam == 0) {
......
......@@ -225,51 +225,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code;
}
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
pos += 3;
ms = atoi(pos);
while(*pos >= '0' && *pos <= '9'){
pos ++;
}
char unit_char = *pos;
if(unit_char == 'h'){
ms *= 3600*1000;
} else if(unit_char == 'm'){
ms *= 60*1000;
} else if(unit_char == 's'){
ms *= 1000;
}
}
if(ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if(ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while(used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isQueryKilled(pQInfo)){
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
......
此差异已折叠。
......@@ -1677,3 +1677,9 @@ static void rpcDecRef(SRpcInfo *pRpc)
}
}
int32_t rpcUnusedSession(void * rpcInfo, bool bLock) {
SRpcInfo *info = (SRpcInfo *)rpcInfo;
if(info == NULL)
return 0;
return taosIdPoolNumOfFree(info->idPool, bLock);
}
\ No newline at end of file
......@@ -36,6 +36,9 @@ int taosIdPoolNumOfUsed(void *handle);
bool taosIdPoolMarkStatus(void *handle, int id);
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock);
#ifdef __cplusplus
}
#endif
......
......@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
*/
SStrToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes, not include '\0'
*/
int32_t tStrNCpy(char *dst, SStrToken* srcToken);
/**
* check if it is a keyword or not
* @param z
......
......@@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) {
pthread_mutex_unlock(&pIdPool->mutex);
return ret;
}
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock) {
id_pool_t *pIdPool = handle;
if(bLock)
pthread_mutex_lock(&pIdPool->mutex);
int ret = pIdPool->numOfFree;
if(bLock)
pthread_mutex_unlock(&pIdPool->mutex);
return ret;
}
\ No newline at end of file
......@@ -231,7 +231,9 @@ static SKeyword keywordTable[] = {
{"AGGREGATE", TK_AGGREGATE},
{"BUFSIZE", TK_BUFSIZE},
{"RANGE", TK_RANGE},
{"CONTAINS", TK_CONTAINS}
{"CONTAINS", TK_CONTAINS},
{"TO", TK_TO},
{"SPLIT", TK_SPLIT}
};
static const char isIdChar[] = {
......@@ -704,6 +706,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
return t0;
}
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes
*/
int32_t tStrNCpy(char *dst, SStrToken* srcToken) {
strncpy(dst, srcToken->z, srcToken->n);
return srcToken->n;
}
bool taosIsKeyWordToken(const char* z, int32_t len) {
return (tKeywordCode((char*)z, len) != TK_ID);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册