提交 187aca4d 编写于 作者: wmmhello's avatar wmmhello

fix:conflict from 3.0

......@@ -204,65 +204,65 @@
#define TK_SPLIT 186
#define TK_SYNCDB 187
#define TK_DELETE 188
#define TK_NULL 189
#define TK_NK_QUESTION 190
#define TK_NK_ARROW 191
#define TK_ROWTS 192
#define TK_TBNAME 193
#define TK_QSTARTTS 194
#define TK_QENDTS 195
#define TK_WSTARTTS 196
#define TK_WENDTS 197
#define TK_WDURATION 198
#define TK_CAST 199
#define TK_NOW 200
#define TK_TODAY 201
#define TK_TIMEZONE 202
#define TK_CLIENT_VERSION 203
#define TK_SERVER_VERSION 204
#define TK_SERVER_STATUS 205
#define TK_CURRENT_USER 206
#define TK_COUNT 207
#define TK_LAST_ROW 208
#define TK_BETWEEN 209
#define TK_IS 210
#define TK_NK_LT 211
#define TK_NK_GT 212
#define TK_NK_LE 213
#define TK_NK_GE 214
#define TK_NK_NE 215
#define TK_MATCH 216
#define TK_NMATCH 217
#define TK_CONTAINS 218
#define TK_JOIN 219
#define TK_INNER 220
#define TK_SELECT 221
#define TK_DISTINCT 222
#define TK_WHERE 223
#define TK_PARTITION 224
#define TK_BY 225
#define TK_SESSION 226
#define TK_STATE_WINDOW 227
#define TK_SLIDING 228
#define TK_FILL 229
#define TK_VALUE 230
#define TK_NONE 231
#define TK_PREV 232
#define TK_LINEAR 233
#define TK_NEXT 234
#define TK_HAVING 235
#define TK_RANGE 236
#define TK_EVERY 237
#define TK_ORDER 238
#define TK_SLIMIT 239
#define TK_SOFFSET 240
#define TK_LIMIT 241
#define TK_OFFSET 242
#define TK_ASC 243
#define TK_NULLS 244
#define TK_ID 245
#define TK_NK_BITNOT 246
#define TK_INSERT 247
#define TK_INSERT 189
#define TK_NULL 190
#define TK_NK_QUESTION 191
#define TK_NK_ARROW 192
#define TK_ROWTS 193
#define TK_TBNAME 194
#define TK_QSTARTTS 195
#define TK_QENDTS 196
#define TK_WSTARTTS 197
#define TK_WENDTS 198
#define TK_WDURATION 199
#define TK_CAST 200
#define TK_NOW 201
#define TK_TODAY 202
#define TK_TIMEZONE 203
#define TK_CLIENT_VERSION 204
#define TK_SERVER_VERSION 205
#define TK_SERVER_STATUS 206
#define TK_CURRENT_USER 207
#define TK_COUNT 208
#define TK_LAST_ROW 209
#define TK_BETWEEN 210
#define TK_IS 211
#define TK_NK_LT 212
#define TK_NK_GT 213
#define TK_NK_LE 214
#define TK_NK_GE 215
#define TK_NK_NE 216
#define TK_MATCH 217
#define TK_NMATCH 218
#define TK_CONTAINS 219
#define TK_JOIN 220
#define TK_INNER 221
#define TK_SELECT 222
#define TK_DISTINCT 223
#define TK_WHERE 224
#define TK_PARTITION 225
#define TK_BY 226
#define TK_SESSION 227
#define TK_STATE_WINDOW 228
#define TK_SLIDING 229
#define TK_FILL 230
#define TK_VALUE 231
#define TK_NONE 232
#define TK_PREV 233
#define TK_LINEAR 234
#define TK_NEXT 235
#define TK_HAVING 236
#define TK_RANGE 237
#define TK_EVERY 238
#define TK_ORDER 239
#define TK_SLIMIT 240
#define TK_SOFFSET 241
#define TK_LIMIT 242
#define TK_OFFSET 243
#define TK_ASC 244
#define TK_NULLS 245
#define TK_ID 246
#define TK_NK_BITNOT 247
#define TK_VALUES 248
#define TK_IMPORT 249
#define TK_NK_SEMI 250
......
......@@ -208,7 +208,7 @@ int32_t doFilterTag(const SNode* pFilterNode, SIndexMetaArg* metaArg, SArray* re
* destory index env
*
*/
void indexCleanUp();
void indexCleanup();
#ifdef __cplusplus
}
......
......@@ -194,6 +194,7 @@ typedef enum ENodeType {
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
// logic plan node
......@@ -247,6 +248,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
......
......@@ -131,6 +131,7 @@ typedef struct SVnodeModifyLogicNode {
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
STimeWindow deleteTimeRange;
SVgroupsInfo* pVgroupList;
} SVnodeModifyLogicNode;
typedef struct SExchangeLogicNode {
......@@ -456,6 +457,15 @@ typedef struct SDataInserterNode {
char* pData;
} SDataInserterNode;
typedef struct SQueryInserterNode {
SDataSinkNode sink;
uint64_t tableId;
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
int32_t vgId;
SEpSet epSet;
} SQueryInserterNode;
typedef struct SDataDeleterNode {
SDataSinkNode sink;
uint64_t tableId;
......
......@@ -302,6 +302,14 @@ typedef struct SDeleteStmt {
bool deleteZeroRows;
} SDeleteStmt;
typedef struct SInsertStmt {
ENodeType type; // QUERY_NODE_INSERT_STMT
SNode* pTable;
SNodeList* pCols;
SNode* pQuery;
uint8_t precision;
} SInsertStmt;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
......
......@@ -56,7 +56,7 @@ typedef struct SParseContext {
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool qIsInsertSql(const char* pStr, size_t length);
bool qIsInsertValuesSql(const char* pStr, size_t length);
// for async mode
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
......
......@@ -26,7 +26,7 @@ extern "C" {
extern bool gRaftDetailLog;
#define SYNC_MAX_BATCH_SIZE 100
#define SYNC_MAX_BATCH_SIZE 500
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
......
......@@ -324,9 +324,9 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
}
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) {
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SVgroupInfo vgInfo = {0};
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SVgroupInfo vgInfo = {0};
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
.requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
......@@ -391,13 +391,12 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STMT_RET(stmtCleanBindInfo(pStmt));
}
STableMeta* pTableMeta = NULL;
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
STableMeta* pTableMeta = NULL;
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
.requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
int32_t code =
catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
......@@ -849,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
if (pStmt->sql.type) {
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
} else {
*insert = qIsInsertSql(pStmt->sql.sqlStr, 0);
*insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
}
return TSDB_CODE_SUCCESS;
......@@ -861,7 +860,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
......@@ -893,7 +892,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
......@@ -919,7 +918,6 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
return TSDB_CODE_SUCCESS;
}
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -952,13 +950,13 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
return TSDB_CODE_SUCCESS;
}
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
......@@ -979,8 +977,8 @@ int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
int32_t nums = 0;
TAOS_FIELD_E *pField = NULL;
int32_t nums = 0;
TAOS_FIELD_E* pField = NULL;
STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
if (idx >= nums) {
tscError("idx %d is too big", idx);
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "dmNodes.h"
#include "index.h"
#include "qworker.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
......@@ -213,6 +214,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer(pDnode);
dmClearVars(pDnode);
rpcCleanup();
indexCleanup();
dDebug("dnode is closed, ptr:%p", pDnode);
}
......
......@@ -1373,9 +1373,9 @@ char *buildRetension(SArray *pRetension) {
static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables,
bool sysDb, ESdbStatus objStatus, bool sysinfo) {
int32_t cols = 0;
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
char *buf = taosMemoryMalloc(bytes);
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
char *buf = taosMemoryMalloc(bytes);
const char *name = mndGetDbStr(pDb->name);
if (name != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(buf, name, bytes);
......@@ -1383,11 +1383,11 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
STR_WITH_MAXSIZE_TO_VARSTR(buf, "NULL", bytes);
}
char *status = "ready";
if (objStatus == SDB_STATUS_CREATING) status = "creating";
if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
char statusB[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status));
char *statusStr = "ready";
if (objStatus == SDB_STATUS_CREATING) statusStr = "creating";
if (objStatus == SDB_STATUS_DROPPING) statusStr = "dropping";
char statusVstr[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusVstr, statusStr, strlen(statusStr));
if (sysDb || !sysinfo) {
for (int32_t i = 0; i < pShow->numOfColumns; ++i) {
......@@ -1397,7 +1397,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
} else if (i == 3) {
colDataAppend(pColInfo, rows, (const char *)&numOfTables, false);
} else if (i == 20) {
colDataAppend(pColInfo, rows, statusB, false);
colDataAppend(pColInfo, rows, statusVstr, false);
} else {
colDataAppendNULL(pColInfo, rows);
}
......@@ -1405,7 +1405,6 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
} else {
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, buf, false);
taosMemoryFree(buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->createdTime, false);
......@@ -1419,30 +1418,29 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false);
const char *src = pDb->cfg.strict ? "strict" : "no_strict";
char strict[24] = {0};
STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src));
const char *strictStr = pDb->cfg.strict ? "strict" : "no_strict";
char strictVstr[24] = {0};
STR_WITH_SIZE_TO_VARSTR(strictVstr, strictStr, strlen(strictStr));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)strict, false);
colDataAppend(pColInfo, rows, (const char *)strictVstr, false);
char tmp[128] = {0};
int32_t len = 0;
len = sprintf(&tmp[VARSTR_HEADER_SIZE], "%dm", pDb->cfg.daysPerFile);
varDataSetLen(tmp, len);
char durationVstr[128] = {0};
int32_t len = sprintf(&durationVstr[VARSTR_HEADER_SIZE], "%dm", pDb->cfg.daysPerFile);
varDataSetLen(durationVstr, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)tmp, false);
colDataAppend(pColInfo, rows, (const char *)durationVstr, false);
char keepVstr[128] = {0};
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
len = sprintf(&tmp[VARSTR_HEADER_SIZE], "%dm,%dm,%dm", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2,
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%dm,%dm,%dm", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2,
pDb->cfg.daysToKeep0);
} else {
len = sprintf(&tmp[VARSTR_HEADER_SIZE], "%dm,%dm,%dm", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1,
len = sprintf(&keepVstr[VARSTR_HEADER_SIZE], "%dm,%dm,%dm", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1,
pDb->cfg.daysToKeep2);
}
varDataSetLen(tmp, len);
varDataSetLen(keepVstr, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)tmp, false);
colDataAppend(pColInfo, rows, (const char *)keepVstr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.buffer, false);
......@@ -1469,68 +1467,49 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src));
#if 0
char cacheModel[24] = {0};
bool null = false;
if (pDb->cfg.cacheLastRow == 0) {
STR_TO_VARSTR(cacheModel, "no_cache");
} else if (pDb->cfg.cacheLastRow == 1) {
STR_TO_VARSTR(cacheModel, "last_row_cache")
} else {
null = true;
}
colDataAppend(pColInfo, rows, cacheModel, null);
#endif
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false);
char *prec = NULL;
const char *precStr = NULL;
switch (pDb->cfg.precision) {
case TSDB_TIME_PRECISION_MILLI:
prec = TSDB_TIME_PRECISION_MILLI_STR;
precStr = TSDB_TIME_PRECISION_MILLI_STR;
break;
case TSDB_TIME_PRECISION_MICRO:
prec = TSDB_TIME_PRECISION_MICRO_STR;
precStr = TSDB_TIME_PRECISION_MICRO_STR;
break;
case TSDB_TIME_PRECISION_NANO:
prec = TSDB_TIME_PRECISION_NANO_STR;
precStr = TSDB_TIME_PRECISION_NANO_STR;
break;
default:
prec = "none";
precStr = "none";
break;
}
char t[10] = {0};
STR_WITH_SIZE_TO_VARSTR(t, prec, 2);
char precVstr[10] = {0};
STR_WITH_SIZE_TO_VARSTR(precVstr, precStr, 2);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)t, false);
colDataAppend(pColInfo, rows, (const char *)precVstr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)statusB, false);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
char *p = buildRetension(pDb->cfg.pRetensions);
colDataAppend(pColInfo, rows, (const char *)statusVstr, false);
char *rentensionVstr = buildRetension(pDb->cfg.pRetensions);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
if (p == NULL) {
if (rentensionVstr == NULL) {
colDataAppendNULL(pColInfo, rows);
} else {
colDataAppend(pColInfo, rows, (const char *)p, false);
taosMemoryFree(p);
colDataAppend(pColInfo, rows, (const char *)rentensionVstr, false);
taosMemoryFree(rentensionVstr);
}
}
taosMemoryFree(buf);
}
static void setInformationSchemaDbCfg(SDbObj *pDbObj) {
ASSERT(pDbObj != NULL);
strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name));
tstrncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name));
pDbObj->createdTime = 0;
pDbObj->cfg.numOfVgroups = 0;
pDbObj->cfg.strict = 1;
......@@ -1539,9 +1518,7 @@ static void setInformationSchemaDbCfg(SDbObj *pDbObj) {
}
static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
ASSERT(pDbObj != NULL);
strncpy(pDbObj->name, TSDB_PERFORMANCE_SCHEMA_DB, tListLen(pDbObj->name));
tstrncpy(pDbObj->name, TSDB_PERFORMANCE_SCHEMA_DB, tListLen(pDbObj->name));
pDbObj->createdTime = 0;
pDbObj->cfg.numOfVgroups = 0;
pDbObj->cfg.strict = 1;
......@@ -1585,14 +1562,11 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus);
if (pShow->pIter == NULL) {
break;
}
if (pShow->pIter == NULL) break;
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) == 0) {
int32_t numOfTables = 0;
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL);
dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus, sysinfo);
numOfRows++;
}
......
......@@ -240,7 +240,7 @@ typedef struct SColMatchInfo {
int32_t srcSlotId; // source slot id
int32_t colId;
int32_t targetSlotId;
bool output;
bool output; // todo remove this?
bool reserved;
int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo;
......@@ -479,6 +479,8 @@ typedef struct SAggOperatorInfo {
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
SNode *pCondition;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
......@@ -680,6 +682,8 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SNode* pCondition;
} SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
......@@ -758,7 +762,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
......
......@@ -700,7 +700,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
SColumnInfoData* p = taosArrayGet(pCols, i);
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j);
if (!outputEveryColumn && !pmInfo->output) {
if (!outputEveryColumn && pmInfo->reserved) {
j++;
continue;
}
......
......@@ -3012,11 +3012,19 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
if (pInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pAggInfo->pCondition, pInfo->pRes);
if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
break;
}
if (pInfo->pRes->info.rows > 0) {
break;
}
}
size_t rows = blockDataGetNumOfRows(pInfo->pRes);
pOperator->resultInfo.totalRows += rows;
......@@ -3557,7 +3565,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -3581,6 +3589,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pInfo->groupId = INT32_MIN;
pInfo->pCondition = pCondition;
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true;
......@@ -4328,7 +4337,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} else {
pOptr =
createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pTaskInfo);
createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......
......@@ -46,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
;
pInfo->pCondition = pSortPhyNode->node.pConditions;
pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
......@@ -205,14 +205,27 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
SSDataBlock* pBlock = NULL;
while (1) {
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
if (pBlock != NULL) {
doFilter(pInfo->pCondition, pBlock);
}
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
if (blockDataGetNumOfRows(pBlock) > 0) {
break;
}
}
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
doSetOperatorCompleted(pOperator);
}
return pBlock;
}
......
......@@ -605,7 +605,7 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
SValueNode* pValue = (SValueNode*)pParamNode1;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
if (!IS_INTEGER_TYPE(pValue->node.resType.type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
......@@ -1615,26 +1615,27 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
SExprNode* pPara0 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 1);
SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 1);
uint8_t para1Type = p1->resType.type;
if (!IS_VAR_DATA_TYPE(pPara0->resType.type) || !IS_INTEGER_TYPE(para1Type)) {
uint8_t para0Type = pPara0->resType.type;
uint8_t para1Type = pPara1->resType.type;
if (!IS_VAR_DATA_TYPE(para0Type) || !IS_INTEGER_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (((SValueNode*)p1)->datum.i < 1) {
if (((SValueNode*)pPara1)->datum.i == 0) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
if (3 == numOfParams) {
SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2);
uint8_t para2Type = p2->resType.type;
SExprNode* pPara2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2);
uint8_t para2Type = pPara2->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
int64_t v = ((SValueNode*)p1)->datum.i;
if (v < 0 || v > INT16_MAX) {
int64_t v = ((SValueNode*)pPara2)->datum.i;
if (v < 0) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
}
......
......@@ -90,12 +90,14 @@ typedef struct SStddevRes {
double result;
int64_t count;
union {
double quadraticDSum;
int64_t quadraticISum;
double quadraticDSum;
int64_t quadraticISum;
uint64_t quadraticUSum;
};
union {
double dsum;
int64_t isum;
double dsum;
int64_t isum;
uint64_t usum;
};
int16_t type;
} SStddevRes;
......@@ -1729,6 +1731,68 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t* plist = (uint8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t* plist = (uint16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t* plist = (uint32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t* plist = (uint64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
......@@ -1771,9 +1835,12 @@ _stddev_over:
static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) {
pOutput->type = pInput->type;
if (IS_INTEGER_TYPE(pOutput->type)) {
if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->quadraticISum += pInput->quadraticISum;
pOutput->isum += pInput->isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->quadraticUSum += pInput->quadraticUSum;
pOutput->usum += pInput->usum;
} else {
pOutput->quadraticDSum += pInput->quadraticDSum;
pOutput->dsum += pInput->dsum;
......@@ -1848,6 +1915,22 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) {
LIST_STDDEV_SUB_N(pStddevRes->isum, int64_t);
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint8_t);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint16_t);
break;
}
case TSDB_DATA_TYPE_UINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint32_t);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
LIST_STDDEV_SUB_N(pStddevRes->dsum, float);
break;
......@@ -1871,9 +1954,12 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t type = pStddevRes->type;
double avg;
if (IS_INTEGER_TYPE(type)) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
avg = pStddevRes->isum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(fabs(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg));
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
avg = pStddevRes->usum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(fabs(pStddevRes->quadraticUSum / ((double)pStddevRes->count) - avg * avg));
} else {
avg = pStddevRes->dsum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(fabs(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg));
......@@ -1913,9 +1999,12 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_INTEGER_TYPE(type)) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pDBuf->isum += pSBuf->isum;
pDBuf->quadraticISum += pSBuf->quadraticISum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pDBuf->usum += pSBuf->usum;
pDBuf->quadraticUSum += pSBuf->quadraticUSum;
} else {
pDBuf->dsum += pSBuf->dsum;
pDBuf->quadraticDSum += pSBuf->quadraticDSum;
......
......@@ -65,9 +65,10 @@ void indexInit() {
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
indexRefMgt = taosOpenRef(10, indexDestroy);
}
void indexCleanUp() {
void indexCleanup() {
// refacto later
taosCleanUpScheduler(indexQhandle);
taosCloseRef(indexRefMgt);
}
typedef struct SIdxColInfo {
......
......@@ -165,7 +165,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
memcpy(pDst->datum.p, pSrc->datum.p, len);
break;
}
case TSDB_DATA_TYPE_JSON:{
case TSDB_DATA_TYPE_JSON: {
int32_t len = getJsonValueLen(pSrc->datum.p);
pDst->datum.p = taosMemoryCalloc(1, len);
if (NULL == pDst->datum.p) {
......@@ -397,6 +397,7 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableFName);
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
return TSDB_CODE_SUCCESS;
}
......
......@@ -19,8 +19,8 @@
#include "query.h"
#include "querynodes.h"
#include "taoserror.h"
#include "tjson.h"
#include "tdatablock.h"
#include "tjson.h"
static int32_t nodeToJson(const void* pObj, SJson* pJson);
static int32_t jsonToNode(const SJson* pJson, void* pObj);
......@@ -179,6 +179,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowVnodeStmt";
case QUERY_NODE_DELETE_STMT:
return "DeleteStmt";
case QUERY_NODE_INSERT_STMT:
return "InsertStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -271,6 +273,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return "PhysiInsert";
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return "PhysiQueryInsert";
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return "PhysiDelete";
case QUERY_NODE_PHYSICAL_SUBPLAN:
......@@ -2210,6 +2214,58 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return
static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); }
static const char* jkQueryInsertPhysiPlanTableId = "TableId";
static const char* jkQueryInsertPhysiPlanTableType = "TableType";
static const char* jkQueryInsertPhysiPlanTableFName = "TableFName";
static const char* jkQueryInsertPhysiPlanVgId = "VgId";
static const char* jkQueryInsertPhysiPlanEpSet = "EpSet";
static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj;
int32_t code = physicDataSinkNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkQueryInsertPhysiPlanEpSet, epSetToJson, &pNode->epSet);
}
return code;
}
static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
SQueryInserterNode* pNode = (SQueryInserterNode*)pObj;
int32_t code = jsonToPhysicDataSinkNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkQueryInsertPhysiPlanEpSet, jsonToEpSet, &pNode->epSet);
}
return code;
}
static const char* jkDeletePhysiPlanTableId = "TableId";
static const char* jkDeletePhysiPlanTableType = "TableType";
static const char* jkDeletePhysiPlanTableFName = "TableFName";
......@@ -2641,9 +2697,9 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
case TSDB_DATA_TYPE_VARBINARY:
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
break;
case TSDB_DATA_TYPE_JSON:{
case TSDB_DATA_TYPE_JSON: {
int32_t len = getJsonValueLen(pNode->datum.p);
char* buf = taosMemoryCalloc( len * 2 + 1, sizeof(char));
char* buf = taosMemoryCalloc(len * 2 + 1, sizeof(char));
code = taosHexEncode(pNode->datum.p, buf, len);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
......@@ -2775,7 +2831,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
}
break;
}
case TSDB_DATA_TYPE_JSON:{
case TSDB_DATA_TYPE_JSON: {
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes);
if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -4232,6 +4288,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
break;
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return physiQueryInsertNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return physiDeleteNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_SUBPLAN:
......@@ -4374,6 +4432,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiInterpFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return jsonToPhysiQueryInsertNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return jsonToPhysiDeleteNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN:
......
......@@ -229,6 +229,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SKillStmt));
case QUERY_NODE_DELETE_STMT:
return makeNode(type, sizeof(SDeleteStmt));
case QUERY_NODE_INSERT_STMT:
return makeNode(type, sizeof(SInsertStmt));
case QUERY_NODE_QUERY:
return makeNode(type, sizeof(SQuery));
case QUERY_NODE_LOGIC_PLAN_SCAN:
......@@ -325,6 +327,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return makeNode(type, sizeof(SDataInserterNode));
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return makeNode(type, sizeof(SQueryInserterNode));
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return makeNode(type, sizeof(SDataDeleterNode));
case QUERY_NODE_PHYSICAL_SUBPLAN:
......@@ -690,6 +694,13 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pTagCond);
break;
}
case QUERY_NODE_INSERT_STMT: {
SInsertStmt* pStmt = (SInsertStmt*)pNode;
nodesDestroyNode(pStmt->pTable);
nodesDestroyList(pStmt->pCols);
nodesDestroyNode(pStmt->pQuery);
break;
}
case QUERY_NODE_QUERY: {
SQuery* pQuery = (SQuery*)pNode;
nodesDestroyNode(pQuery->pRoot);
......@@ -925,6 +936,11 @@ void nodesDestroyNode(SNode* pNode) {
taosMemoryFreeClear(pSink->pData);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SQueryInserterNode* pSink = (SQueryInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
......@@ -1524,7 +1540,6 @@ int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, EColle
}
return TSDB_CODE_SUCCESS;
}
typedef struct SCollectFuncsCxt {
......
......@@ -210,6 +210,7 @@ SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName);
SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere);
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery);
#ifdef __cplusplus
}
......
......@@ -259,7 +259,7 @@ multi_create_clause(A) ::= multi_create_clause(B) create_subtable_clause(C).
create_subtable_clause(A) ::=
not_exists_opt(B) full_table_name(C) USING full_table_name(D)
specific_tags_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
specific_cols_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
%type multi_drop_clause { SNodeList* }
%destructor multi_drop_clause { nodesDestroyList($$); }
......@@ -268,10 +268,10 @@ multi_drop_clause(A) ::= multi_drop_clause(B) drop_table_clause(C).
drop_table_clause(A) ::= exists_opt(B) full_table_name(C). { A = createDropTableClause(pCxt, B, C); }
%type specific_tags_opt { SNodeList* }
%destructor specific_tags_opt { nodesDestroyList($$); }
specific_tags_opt(A) ::= . { A = NULL; }
specific_tags_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
%type specific_cols_opt { SNodeList* }
%destructor specific_cols_opt { nodesDestroyList($$); }
specific_cols_opt(A) ::= . { A = NULL; }
specific_cols_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
full_table_name(A) ::= table_name(B). { A = createRealTableNode(pCxt, NULL, &B, NULL); }
full_table_name(A) ::= db_name(B) NK_DOT table_name(C). { A = createRealTableNode(pCxt, &B, &C, NULL); }
......@@ -515,6 +515,9 @@ cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B).
/************************************************ select **************************************************************/
cmd ::= query_expression(A). { pCxt->pRootNode = A; }
/************************************************ insert **************************************************************/
cmd ::= INSERT INTO full_table_name(A) specific_cols_opt(B) query_expression(C). { pCxt->pRootNode = createInsertStmt(pCxt, A, B, C); }
/************************************************ literal *************************************************************/
literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B)); }
literal(A) ::= NK_FLOAT(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B)); }
......@@ -973,4 +976,4 @@ null_ordering_opt(A) ::= .
null_ordering_opt(A) ::= NULLS FIRST. { A = NULL_ORDER_FIRST; }
null_ordering_opt(A) ::= NULLS LAST. { A = NULL_ORDER_LAST; }
%fallback ID NK_BITNOT INSERT VALUES IMPORT NK_SEMI FILE.
%fallback ID NK_BITNOT VALUES IMPORT NK_SEMI FILE.
......@@ -1662,3 +1662,13 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
}
return (SNode*)pStmt;
}
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
SInsertStmt* pStmt = (SInsertStmt*)nodesMakeNode(QUERY_NODE_INSERT_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->pTable = pTable;
pStmt->pCols = pCols;
pStmt->pQuery = pQuery;
return (SNode*)pStmt;
}
......@@ -447,6 +447,14 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
}
static int32_t collectMetaKeyFromInsert(SCollectMetaKeyCxt* pCxt, SInsertStmt* pStmt) {
int32_t code = collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pTable, AUTH_TYPE_WRITE);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
return code;
}
static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) {
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
strcpy(name.dbname, pStmt->dbName);
......@@ -554,6 +562,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_DELETE_STMT:
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
case QUERY_NODE_INSERT_STMT:
return collectMetaKeyFromInsert(pCxt, (SInsertStmt*)pStmt);
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
......
......@@ -39,7 +39,7 @@ static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, AUTH_TYPE type) {
if (NULL != pCxt->pMetaCache) {
code = getUserAuthFromCache(pCxt->pMetaCache, pParseCxt->pUser, dbFname, type, &pass);
} else {
SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter,
SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter,
.requestId = pParseCxt->requestId,
.requestObjRefId = pParseCxt->requestRid,
.mgmtEps = pParseCxt->mgmtEpSet};
......@@ -88,6 +88,14 @@ static int32_t authDelete(SAuthCxt* pCxt, SDeleteStmt* pDelete) {
return checkAuth(pCxt, ((SRealTableNode*)pDelete->pFromTable)->table.dbName, AUTH_TYPE_WRITE);
}
static int32_t authInsert(SAuthCxt* pCxt, SInsertStmt* pInsert) {
int32_t code = checkAuth(pCxt, ((SRealTableNode*)pInsert->pTable)->table.dbName, AUTH_TYPE_WRITE);
if (TSDB_CODE_SUCCESS == code) {
code = authQuery(pCxt, pInsert->pQuery);
}
return code;
}
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SET_OPERATOR:
......@@ -98,6 +106,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
return authDropUser(pCxt, (SDropUserStmt*)pStmt);
case QUERY_NODE_DELETE_STMT:
return authDelete(pCxt, (SDeleteStmt*)pStmt);
case QUERY_NODE_INSERT_STMT:
return authInsert(pCxt, (SInsertStmt*)pStmt);
default:
break;
}
......
......@@ -300,6 +300,14 @@ static int32_t calcConstDelete(SCalcConstContext* pCxt, SDeleteStmt* pDelete) {
return code;
}
static int32_t calcConstInsert(SCalcConstContext* pCxt, SInsertStmt* pInsert) {
int32_t code = calcConstFromTable(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) {
code = calcConstQuery(pCxt, pInsert->pQuery, false);
}
return code;
}
static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subquery) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pStmt)) {
......@@ -320,6 +328,9 @@ static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subque
case QUERY_NODE_DELETE_STMT:
code = calcConstDelete(pCxt, (SDeleteStmt*)pStmt);
break;
case QUERY_NODE_INSERT_STMT:
code = calcConstInsert(pCxt, (SInsertStmt*)pStmt);
break;
default:
break;
}
......
......@@ -110,7 +110,7 @@ typedef struct SMemParam {
static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
SToken sToken;
NEXT_TOKEN(*pSql, sToken);
if (TK_INSERT != sToken.type) {
if (TK_INSERT != sToken.type && TK_IMPORT != sToken.type) {
return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
}
NEXT_TOKEN(*pSql, sToken);
......
......@@ -2839,6 +2839,21 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
return code;
}
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
pCxt->pCurrStmt = (SNode*)pInsert;
int32_t code = translateFrom(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) {
code = translateExprList(pCxt, pInsert->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = resetTranslateNamespace(pCxt);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pInsert->pQuery);
}
return code;
}
static int64_t getUnitPerMinute(uint8_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
......@@ -4608,6 +4623,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_DELETE_STMT:
code = translateDelete(pCxt, (SDeleteStmt*)pNode);
break;
case QUERY_NODE_INSERT_STMT:
code = translateInsert(pCxt, (SInsertStmt*)pNode);
break;
case QUERY_NODE_CREATE_DATABASE_STMT:
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
break;
......@@ -6224,6 +6242,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DELETE;
break;
case QUERY_NODE_INSERT_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_SUBMIT;
break;
case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
......
......@@ -376,8 +376,6 @@ int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, voi
char* jsonKey = item->string;
if (!isValidateTag(jsonKey)) {
fprintf(stdout, "%s(%d) %s %08" PRId64 "\n", __FILE__, __LINE__, __func__, taosGetSelfPthreadId());
fflush(stdout);
retCode = buildSyntaxErrMsg(pMsgBuf, "json key not validate", jsonKey);
goto end;
}
......
......@@ -19,19 +19,28 @@
#include "parInt.h"
#include "parToken.h"
bool qIsInsertSql(const char* pStr, size_t length) {
bool qIsInsertValuesSql(const char* pStr, size_t length) {
if (NULL == pStr) {
return false;
}
const char* pSql = pStr;
int32_t index = 0;
SToken t = tStrGetToken((char*)pStr, &index, false);
if (TK_INSERT != t.type && TK_IMPORT != t.type) {
return false;
}
do {
SToken t0 = tStrGetToken((char*)pStr, &index, false);
if (t0.type != TK_NK_LP) {
return t0.type == TK_INSERT || t0.type == TK_IMPORT;
pStr += index;
index = 0;
t = tStrGetToken((char*)pStr, &index, false);
if (TK_USING == t.type || TK_VALUES == t.type) {
return true;
}
} while (1);
} while (pStr - pSql < length);
return false;
}
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
......@@ -148,7 +157,7 @@ static void rewriteExprAlias(SNode* pRoot) {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery, NULL);
} else {
code = parseSqlIntoAst(pCxt, pQuery);
......@@ -160,7 +169,7 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -40,6 +40,12 @@ TEST_F(ParserExplainToSyncdbTest, grant) {
run("GRANT READ, WRITE ON test.* TO wxy");
}
TEST_F(ParserExplainToSyncdbTest, insert) {
useDb("root", "test");
run("INSERT INTO t1 SELECT * FROM t1");
}
// todo kill connection
// todo kill query
// todo kill stream
......
......@@ -26,6 +26,7 @@ typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**);
typedef int32_t (*FCreateSelectLogicNode)(SLogicPlanContext*, SSelectStmt*, SLogicNode**);
typedef int32_t (*FCreateSetOpLogicNode)(SLogicPlanContext*, SSetOperator*, SLogicNode**);
typedef int32_t (*FCreateDeleteLogicNode)(SLogicPlanContext*, SDeleteStmt*, SLogicNode**);
typedef int32_t (*FCreateInsertLogicNode)(SLogicPlanContext*, SInsertStmt*, SLogicNode**);
static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable,
SLogicNode** pLogicNode);
......@@ -1262,6 +1263,47 @@ static int32_t createDeleteLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pDele
return code;
}
static int32_t creatInsertRootLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, FCreateInsertLogicNode func,
SLogicNode** pRoot) {
return createRootLogicNode(pCxt, pInsert, pInsert->precision, (FCreateLogicNode)func, pRoot);
}
static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInsertStmt* pInsert,
SLogicNode** pLogicNode) {
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY);
if (NULL == pModify) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRealTableNode* pRealTable = (SRealTableNode*)pInsert->pTable;
pModify->modifyType = MODIFY_TABLE_TYPE_INSERT;
pModify->tableId = pRealTable->pMeta->uid;
pModify->tableType = pRealTable->pMeta->tableType;
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
pRealTable->table.dbName, pRealTable->table.tableName);
TSWAP(pModify->pVgroupList, pRealTable->pVgroupList);
*pLogicNode = (SLogicNode*)pModify;
return TSDB_CODE_SUCCESS;
}
static int32_t createInsertLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, SLogicNode** pLogicNode) {
SLogicNode* pRoot = NULL;
int32_t code = createQueryLogicNode(pCxt, pInsert->pQuery, &pRoot);
if (TSDB_CODE_SUCCESS == code) {
code = creatInsertRootLogicNode(pCxt, pInsert, createVnodeModifLogicNodeByInsert, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = pRoot;
} else {
nodesDestroyNode((SNode*)pRoot);
}
return code;
}
static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SELECT_STMT:
......@@ -1274,6 +1316,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
return createSetOperatorLogicNode(pCxt, (SSetOperator*)pStmt, pLogicNode);
case QUERY_NODE_DELETE_STMT:
return createDeleteLogicNode(pCxt, (SDeleteStmt*)pStmt, pLogicNode);
case QUERY_NODE_INSERT_STMT:
return createInsertLogicNode(pCxt, (SInsertStmt*)pStmt, pLogicNode);
default:
break;
}
......
......@@ -485,7 +485,7 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject
return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond);
}
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode * pJoin, SNode** pCond) {
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond);
}
......@@ -557,9 +557,9 @@ static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogic
static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pOnConds = NULL;
SNode* pCond = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) {
*ppMergeCond = nodesCloneNode(pCond);
......@@ -604,8 +604,8 @@ static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMe
static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
int32_t code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin);
SNode* pJoinMergeCond = NULL;
SNode* pJoinOnCond = NULL;
SNode* pJoinMergeCond = NULL;
SNode* pJoinOnCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond);
}
......@@ -820,12 +820,12 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
typedef struct SRewriteProjCondContext {
SProjectLogicNode* pProj;
int32_t errCode;
}SRewriteProjCondContext;
int32_t errCode;
} SRewriteProjCondContext;
static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) {
SRewriteProjCondContext* pCxt = pContext;
SProjectLogicNode* pProj = pCxt->pProj;
SProjectLogicNode* pProj = pCxt->pProj;
if (QUERY_NODE_COLUMN == nodeType(*ppNode)) {
SNode* pTarget = NULL;
FOREACH(pTarget, pProj->node.pTargets) {
......@@ -840,18 +840,19 @@ static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext
}
nodesDestroyNode(*ppNode);
*ppNode = pExpr;
} // end if expr alias name equal column name
} // end for each project
} // end if target node equals cond column node
} // end for each targets
} // end if expr alias name equal column name
} // end for each project
} // end if target node equals cond column node
} // end for each targets
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) {
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject,
SNode** ppProjectCond) {
SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS};
SNode* pProjectCond = pProject->node.pConditions;
SNode* pProjectCond = pProject->node.pConditions;
nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt);
*ppProjectCond = pProjectCond;
pProject->node.pConditions = NULL;
......@@ -873,7 +874,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pProjCond = NULL;
SNode* pProjCond = NULL;
code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond);
if (TSDB_CODE_SUCCESS == code) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
......@@ -2082,13 +2083,18 @@ static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimi
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
qDebugL("apply optimize %s rule: %s", pRuleName, pStr);
if (NULL == pRuleName) {
qDebugL("before optimize: %s", pStr);
} else {
qDebugL("apply optimize %s rule: %s", pRuleName, pStr);
}
taosMemoryFree(pStr);
}
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false};
bool optimized = false;
dumpLogicSubplan(NULL, pLogicSubplan);
do {
optimized = false;
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
......
......@@ -632,8 +632,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOnConditions,
&pJoin->pOnConditions);
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -1496,12 +1496,60 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
return pSubplan;
}
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
pSubplan->msgType = pModify->msgType;
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
}
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
SDataSinkNode** pSink) {
SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT);
if (NULL == pInserter) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInserter->tableId = pModify->tableId;
pInserter->tableType = pModify->tableType;
strcpy(pInserter->tableFName, pModify->tableFName);
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
int32_t code = TSDB_CODE_SUCCESS;
pInserter->sink.pInputDataBlockDesc =
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
if (NULL == pInserter->sink.pInputDataBlockDesc) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
*pSink = (SDataSinkNode*)pInserter;
} else {
nodesDestroyNode((SNode*)pInserter);
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
int32_t code =
createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) {
code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
}
pSubplan->msgType = TDMT_VND_SUBMIT;
return code;
}
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
if (NULL == pModify->node.pChildren) {
return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
}
return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
}
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
SDataSinkNode** pSink) {
SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
......
......@@ -82,29 +82,41 @@ static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan
return code;
}
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
}
static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
} else {
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
for (int32_t i = 0; i < numOfVgroups; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks =
(SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
for (int32_t i = 0; i < numOfVgroups; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (NULL == pNode->node.pChildren) {
return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
}
return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
}
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
}
return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
}
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
......
......@@ -20,6 +20,7 @@
#define SPLIT_FLAG_MASK(n) (1 << n)
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......@@ -1196,6 +1197,41 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code;
}
typedef struct SInsertSelectSplitInfo {
SLogicNode* pQueryRoot;
SLogicSubplan* pSubplan;
} SInsertSelectSplitInfo;
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SInsertSelectSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
pInfo->pSubplan = pSubplan;
return true;
}
return false;
}
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SInsertSelectSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pQueryRoot, 0));
}
if (TSDB_CODE_SUCCESS == code) {
info.pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
typedef struct SQnodeSplitInfo {
SLogicNode* pSplitNode;
SLogicSubplan* pSubplan;
......@@ -1249,7 +1285,8 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
};
// clang-format on
......@@ -1258,7 +1295,11 @@ static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
qDebugL("apply split %s rule: %s", pRuleName, pStr);
if (NULL == pRuleName) {
qDebugL("before split: %s", pStr);
} else {
qDebugL("apply split %s rule: %s", pRuleName, pStr);
}
taosMemoryFree(pStr);
}
......@@ -1266,6 +1307,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
SSplitContext cxt = {
.pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
bool split = false;
dumpLogicSubplan(NULL, pSubplan);
do {
split = false;
for (int32_t i = 0; i < splitRuleNum; ++i) {
......@@ -1293,8 +1335,16 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
return true;
}
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
}
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
if (!needSplitSubplan(pLogicSubplan)) {
setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
return TSDB_CODE_SUCCESS;
}
......
......@@ -91,3 +91,9 @@ TEST_F(PlanOtherTest, delete) {
run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10");
}
TEST_F(PlanOtherTest, insert) {
useDb("root", "test");
run("INSERT INTO t1 SELECT * FROM t1");
}
......@@ -40,7 +40,7 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
break;
case JOB_TASK_STATUS_DROP:
SCH_ERR_JRET(schProcessOnJobDropped(pJob, *(int32_t*)param));
schProcessOnJobDropped(pJob, *(int32_t*)param);
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
......
......@@ -144,11 +144,9 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
return;
}
if (schJobDone(pJob)) {
return;
}
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)&errCode);
schReleaseJob(*jobId);
*jobId = 0;
}
......
......@@ -36,7 +36,7 @@ typedef struct SRaftCfg {
TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy;
int8_t snapshotEnable;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
......@@ -49,20 +49,20 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char * syncCfg2Str(SSyncCfg *pSyncCfg);
char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char * raftCfg2Str(SRaftCfg *pRaftCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta {
int8_t isStandBy;
int8_t snapshotEnable;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
} SRaftCfgMeta;
......
......@@ -96,12 +96,20 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
return ret;
}
if (pSyncNode->pRaftCfg->snapshotEnable) {
ret = syncNodeRequestVotePeersSnapshot(pSyncNode);
} else {
ret = syncNodeRequestVotePeers(pSyncNode);
switch (pSyncNode->pRaftCfg->snapshotStrategy) {
case SYNC_STRATEGY_NO_SNAPSHOT:
ret = syncNodeRequestVotePeers(pSyncNode);
break;
case SYNC_STRATEGY_STANDARD_SNAPSHOT:
case SYNC_STRATEGY_WAL_FIRST:
ret = syncNodeRequestVotePeersSnapshot(pSyncNode);
break;
default:
ret = syncNodeRequestVotePeers(pSyncNode);
break;
}
ASSERT(ret == 0);
syncNodeResetElectTimer(pSyncNode);
......
......@@ -672,12 +672,12 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
}
if (arrSize > SYNC_MAX_BATCH_SIZE) {
syncNodeErrorLog(pSyncNode, "sync propose match batch error");
syncNodeErrorLog(pSyncNode, "sync propose batch error");
terrno = TSDB_CODE_SYN_BATCH_ERROR;
return -1;
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncNodeErrorLog(pSyncNode, "sync propose not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER;
return -1;
......@@ -707,13 +707,36 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg);
taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
// enqueue msg ok
if (pSyncNode->replicaNum == 1 && pSyncNode->vgId != 1) {
int32_t code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
if (code == 0) {
// update rpc msg applyIndex
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg);
ASSERT(arrSize == pSyncMsg->dataCount);
for (int i = 0; i < arrSize; ++i) {
pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex;
syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum);
}
rpcFreeCont(rpcMsg.pCont);
terrno = 0;
return 1;
} else {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
} else {
sError("enqueue msg error, FpEqMsg is NULL");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
// enqueue msg ok
return 0;
} else {
sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
}
return 0;
......@@ -730,7 +753,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("sync propose not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
sError("vgId:%d, sync propose not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
goto _END;
}
......@@ -739,7 +762,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if (!syncNodeCanChange(pSyncNode)) {
ret = -1;
terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
sError("sync reconfig not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
sError("vgId:%d, sync reconfig not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType),
pMsg->msgType);
goto _END;
}
......@@ -780,7 +804,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("enqueue msg error, FpEqMsg is NULL");
sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId);
}
}
......@@ -790,7 +814,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else {
ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("sync propose not leader, %s", syncUtilState2String(pSyncNode->state));
sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
goto _END;
}
......@@ -820,7 +844,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// create a new raft config file
SRaftCfgMeta meta;
meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotEnable = pSyncInfo->snapshotStrategy;
meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
meta.lastConfigIndex = SYNC_INDEX_INVALID;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0);
......@@ -969,7 +993,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->FpOnSnapshotSend = syncNodeOnSnapshotSendCb;
pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb;
if (pSyncNode->pRaftCfg->snapshotEnable) {
if (pSyncNode->pRaftCfg->snapshotStrategy) {
sInfo("sync node use snapshot");
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb;
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb;
......@@ -1107,7 +1131,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
// option
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
......@@ -2489,6 +2513,9 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
ASSERT(0);
return -1;
}
// update rpc msg conn apply.index
msgArr[i].info.conn.applyIndex = pEntry->index;
}
// fsync once
......@@ -2496,6 +2523,15 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
SWal* pWal = pData->pWal;
walFsync(pWal, true);
if (ths->replicaNum > 1) {
// if multi replica, start replicate right now
syncNodeReplicate(ths);
} else if (ths->replicaNum == 1) {
// one replica
syncMaybeAdvanceCommitIndex(ths);
}
return 0;
}
......
......@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) {
int32_t len = 512;
char * s = taosMemoryMalloc(len);
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
......@@ -182,7 +182,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON *pRoot = cJSON_CreateObject();
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable);
cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
char buf64[128];
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
......@@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -228,7 +228,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
SRaftCfg raftCfg;
raftCfg.cfg = *pCfg;
raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable;
raftCfg.snapshotStrategy = meta.snapshotStrategy;
raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1;
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
......@@ -257,8 +257,8 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable");
pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable);
cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
......@@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);
......
......@@ -132,10 +132,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
ASSERT(pSender != NULL);
ASSERT(!snapshotSenderIsStart(pSender));
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
......@@ -145,26 +141,32 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
return -1;
}
// entry pointer array
SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE];
memset(entryPArr, 0, sizeof(entryPArr));
// get entry batch
int32_t getCount = 0;
SyncIndex getEntryIndex = nextIndex;
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry;
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
if (code == 0) {
ASSERT(pEntry != NULL);
entryPArr[i] = pEntry;
getCount++;
getEntryIndex++;
} else {
break;
}
}
// build msg
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
ASSERT(pMsg != NULL);
// free entries
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry = entryPArr[i];
if (pEntry != NULL) {
......@@ -197,12 +199,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
if (gRaftDetailLog) {
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
......@@ -224,9 +220,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
return -1;
}
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
// prepare entry
SyncAppendEntries* pMsg = NULL;
......@@ -283,11 +276,24 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
// start replicate
int32_t ret = 0;
if (pSyncNode->pRaftCfg->snapshotEnable) {
ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode);
} else {
ret = syncNodeAppendEntriesPeers(pSyncNode);
switch (pSyncNode->pRaftCfg->snapshotStrategy) {
case SYNC_STRATEGY_NO_SNAPSHOT:
ret = syncNodeAppendEntriesPeers(pSyncNode);
break;
case SYNC_STRATEGY_STANDARD_SNAPSHOT:
ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode);
break;
case SYNC_STRATEGY_WAL_FIRST:
ret = syncNodeAppendEntriesPeersSnapshot2(pSyncNode);
break;
default:
ret = syncNodeAppendEntriesPeers(pSyncNode);
break;
}
return ret;
}
......
......@@ -83,7 +83,7 @@ void test3() {
} else {
SRaftCfgMeta meta;
meta.isStandBy = 7;
meta.snapshotEnable = 9;
meta.snapshotStrategy = 9;
meta.lastConfigIndex = 789;
raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
......@@ -108,7 +108,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3;
pCfg->snapshotStrategy += 3;
pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5;
......
......@@ -260,6 +260,14 @@ char *taosDirName(char *name) {
name[0] = 0;
}
return name;
#elif defined(_TD_DARWIN_64)
char *end = strrchr(name, '/');
if (end != NULL) {
*end = '\0';
} else {
name[0] = 0;
}
return name;
#else
return dirname(name);
#endif
......
......@@ -947,9 +947,9 @@ int32_t taosGetFqdn(char *fqdn) {
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
#ifdef WINDOWS
printf("failed to get hostname, reason:%s", strerror(WSAGetLastError()));
printf("failed to get hostname, reason:%s\n", strerror(WSAGetLastError()));
#else
printf("failed to get hostname, reason:%s", strerror(errno));
printf("failed to get hostname, reason:%s\n", strerror(errno));
#endif
assert(0);
return -1;
......@@ -968,7 +968,7 @@ int32_t taosGetFqdn(char *fqdn) {
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
fprintf(stderr, "failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
fprintf(stderr, "failed to get fqdn, code:%d, reason:%s\n", ret, gai_strerror(ret));
return -1;
}
......
......@@ -759,9 +759,11 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
return 0;
#elif defined(_TD_DARWIN_64)
uuid_t uuid = {0};
char buf[37] = {0};
uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower(uuid, uid);
uuid_unparse_lower(uuid, buf);
memcpy(uid, buf, uidlen);
return 0;
#else
int len = 0;
......
......@@ -93,7 +93,7 @@ TS_FUNC = [
"CSUM", "DERIVATIVE", "DIFF", "IRATE", "MAVG", "SAMPLE", "STATECOUNT", "STATEDURATION", "TWA"
]
SYSINFO_FUCN = [
SYSINFO_FUNC = [
"DATABASE", "CLIENT_VERSION", "SERVER_VERSION", "SERVER_STATUS", "CURRENT_USER", "USER"
]
......
......@@ -167,7 +167,8 @@
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
# --- valgrind
./test.sh -f tsim/valgrind/checkError.sim
./test.sh -f tsim/valgrind/checkError1.sim
./test.sh -f tsim/valgrind/checkError2.sim
# --- vnode
# ./test.sh -f tsim/vnode/replica3_basic.sim
......
......@@ -4,13 +4,17 @@ set +e
#set -x
NODE_NAME=
DETAIL=0
while getopts "n:" arg
while getopts "n:d" arg
do
case $arg in
n)
NODE_NAME=$OPTARG
;;
d)
DETAIL=1
;;
?)
echo "unkown argument"
;;
......@@ -30,15 +34,26 @@ fi
TAOS_DIR=`pwd`
LOG_DIR=$TAOS_DIR/sim/$NODE_NAME/log
#CFG_DIR=$TAOS_DIR/sim/$NODE_NAME/cfg
#echo ---- $LOG_DIR
#errors=`grep "ERROR SUMMARY:" ${LOG_DIR}/valgrind-taosd-*.log | cut -d ' ' -f 2,3,4,5 | tr -d "\n"`
error_summary=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "ERROR SUMMARY:" | awk '{print $4}' | awk '{sum+=$1}END{print sum}'`
still_reachable=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "still reachable in" | wc -l`
definitely_lost=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "definitely lost in" | wc -l`
indirectly_lost=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "indirectly lost in " | wc -l`
possibly_lost=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "possibly lost in " | wc -l`
invalid_read=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "Invalid read of " | wc -l`
invalid_write=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "Invalid write of " | wc -l`
invalid_free=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "Invalid free() " | wc -l`
if [ $DETAIL -eq 1 ]; then
echo error_summary: $error_summary
echo still_reachable: $still_reachable
echo definitely_lost: $definitely_lost
echo indirectly_lost: $indirectly_lost
echo possibly_lost: $possibly_lost
echo invalid_read: $invalid_read
echo invalid_write: $invalid_write
echo invalid_free: $invalid_free
fi
let "errors=$still_reachable+$error_summary+$definitely_lost"
let "errors=$error_summary+$still_reachable+$definitely_lost+$indirectly_lost+$possibly_lost+$invalid_read+$invalid_write+$invalid_free"
echo $errors
......@@ -9,17 +9,15 @@ step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
print ----> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ----> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
goto _OVER
print =============== step2: create alter drop show user
sql create user u1 pass 'taosdata'
sql show users
......@@ -29,5 +27,17 @@ sql alter user u1 pass 'taosdata'
sql drop user u1
sql_error alter user u2 sysinfo 0
print =============== step3: create drop dnode
sql create dnode $hostname port 7200
sql drop dnode 2
sql alter dnode 1 'debugflag 143'
print =============== step4: create alter drop show database
sql create database db vgroups 1
sql show databases
sql show db.vgroups
sql drop database db
sql show databases
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -9,11 +9,11 @@ step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
print ----> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ----> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start -v
sql connect
print =============== step1: show dnodes
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ----> dnode not ready!
return -1
endi
sql show dnodes
print ----> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
print =============== step2: create alter drop show user
sql create user u1 pass 'taosdata'
sql show users
sql alter user u1 sysinfo 1
sql alter user u1 enable 1
sql alter user u1 pass 'taosdata'
sql drop user u1
sql_error alter user u2 sysinfo 0
print =============== step3:
print =============== stop
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
print ----> start to check if there are ERRORS in vagrind log file for each dnode
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 40 then
return 0
endi
$null=
if $system_content == $null then
return 0
endi
return -1
......@@ -3,8 +3,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start -v
sql connect
print =============== step1
print =============== step1: create drop show dnodes
$x = 0
step1:
$x = $x + 1
......@@ -19,18 +18,18 @@ if $rows != 1 then
return -1
endi
print =============== step2
print =============== step2: create db
sql create database db vgroups 1
print =============== stop
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
print ----> start to check if there are ERRORS in vagrind log file for each dnode
# -n : dnode[x] be check
system_content sh/checkValgrind.sh -n dnode1
# temporarily expand the threshold, since no time to fix the memory leaks.
print cmd return result ----> [ $system_content ]
if $system_content <= 5 then
if $system_content <= 60 then
return 0
endi
......
......@@ -87,29 +87,29 @@ class TDTestCase:
@property
def create_stable_sql_err(self):
return [
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(ceil) watermark 1s max_delay 1m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark 1min",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m",
# f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) " ,
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
f"create stable stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(ceil) watermark 1s max_delay 1m",
f"create stable stb12 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark 1min",
f"create stable stb13 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s",
f"create stable stb14 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m",
f"create stable stb15 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
f"create stable stb16 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ",
f"create stable stb21 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s",
f"create stable stb22 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m",
f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s",
f"create stable stb23 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) " ,
f"create stable stb24 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
f"create stable stb25 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
f"create stable stb26 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
# watermark, max_delay: [0, 900000], [ms, s, m, ?]
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 1b",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 900001ms",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 16m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 901s",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1h",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 0.2h",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 0.002d",
f"create stable stb17 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u",
f"create stable stb18 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 1b",
f"create stable stb19 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 900001ms",
f"create stable stb20 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 16m",
f"create stable stb27 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 901s",
f"create stable stb28 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1h",
f"create stable stb29 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 0.2h",
f"create stable stb30 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 0.002d",
]
......@@ -125,8 +125,8 @@ class TDTestCase:
f"create stable stb7 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m sma({INT_COL})",
]
def test_create_stb(self):
tdSql.execute("use db2")
def test_create_stb(self, db="db2"):
tdSql.execute(f"use {db}")
for err_sql in self.create_stable_sql_err:
tdSql.error(err_sql)
for cur_sql in self.create_stable_sql_current:
......@@ -136,7 +136,7 @@ class TDTestCase:
tdSql.checkRows(len(self.create_stable_sql_current))
tdSql.execute("use db") # because db is a noraml database, not a rollup database, should not be able to create a rollup stable
# tdSql.error(f"create stable nor_db_rollup_stb ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 5s max_delay 1m")
tdSql.error(f"create stable nor_db_rollup_stb ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 5s max_delay 1m")
def test_create_databases(self):
......@@ -255,6 +255,7 @@ class TDTestCase:
tdSql.execute("drop database if exists db2 ")
tdSql.execute("use db3")
self.test_create_stb(db="db3")
# self.__create_tb()
# self.__insert_data()
self.all_test()
......
......@@ -96,7 +96,7 @@ class TDTestCase:
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
# conn.load_table_info("log")
tdLog.debug("statement start")
start = datetime.now()
stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
......@@ -118,8 +118,11 @@ class TDTestCase:
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
# print(type(stmt))
tdLog.debug("bind_param_batch start")
stmt.bind_param_batch(params)
tdLog.debug("bind_param_batch end")
stmt.execute()
tdLog.debug("execute end")
end = datetime.now()
print("elapsed time: ", end - start)
assert stmt.affected_rows == 3
......@@ -155,7 +158,7 @@ class TDTestCase:
print(rows1)
assert str(rows1[0][0]) == "2021-07-21 17:56:32.589000"
assert rows1[0][10] == 3
tdLog.debug("close start")
stmt.close()
......
......@@ -218,13 +218,13 @@ class TDTestCase:
tdLog.debug("assert 8th case %s"%rows)
assert rows[0][0] == 3, ' 8th case is failed'
# #query: selector Functions 9
# queryparam=new_bind_params(1)
# queryparam[0].int(2)
# rows=self.stmtExe(conn,"select bottom(bu,?) from log group by bu ; ",queryparam)
# tdLog.debug("assert 9th case %s"%rows)
# assert rows[0][0] == 4, ' 9 case is failed'
# assert rows[1][0] == 3, ' 9 case is failed'
#query: selector Functions 9
queryparam=new_bind_params(1)
queryparam[0].int(2)
rows=self.stmtExe(conn,"select bottom(bu,?) from log group by bu order by bu desc ; ",queryparam)
tdLog.debug("assert 9th case %s"%rows)
assert rows[1][0] == 4, ' 9 case is failed'
assert rows[2][0] == 3, ' 9 case is failed'
# #query: time-series specific Functions 10
......
......@@ -123,10 +123,10 @@ class TDTestCase:
elif unit.lower() == '1u':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000)-self.subtractor*1000000)))
# self.check_tbtype(tb_type)
# tdSql.checkRows(len(self.ts_str))
# for i in range(len(self.ts_str)):
# tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor*1000000000)))
self.check_tbtype(tb_type)
tdSql.checkRows(len(self.ts_str))
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i])-self.subtractor*1000000000)))
for unit in self.error_unit:
if tb_type.lower() == 'ntb':
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.ntbname}')
......
此差异已折叠。
......@@ -419,12 +419,66 @@ class TDTestCase:
tdSql.checkData(3,0,4)
tdSql.query("select csum(abs(c1))+2 from t1 ")
tdSql.checkRows(4)
def csum_support_stable(self):
tdSql.query(" select csum(1) from stb1 ")
tdSql.checkRows(70)
tdSql.query("select csum(c1) from stb1 partition by tbname ")
tdSql.checkRows(40)
# tdSql.query("select csum(st1) from stb1 partition by tbname")
# tdSql.checkRows(70)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
# # bug need fix
# tdSql.query("select csum(st1+c1) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(4)
# tdSql.error("select csum(st1+c1) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
# bug need fix
# tdSql.query("select tbname , csum(c1) from stb1 partition by tbname")
# tdSql.checkRows(40)
# tdSql.query("select tbname , csum(st1) from stb1 partition by tbname")
# tdSql.checkRows(70)
# tdSql.query("select tbname , csum(st1) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(7)
# partition by tags
# tdSql.query("select st1 , csum(c1) from stb1 partition by st1")
# tdSql.checkRows(40)
# tdSql.query("select csum(c1) from stb1 partition by st1")
# tdSql.checkRows(40)
# tdSql.query("select st1 , csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
# tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
# partition by col
# tdSql.query("select c1 , csum(c1) from stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select csum(c1) from stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select c1 , csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
# tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
def run(self):
import traceback
try:
# run in develop branch
self.csum_test_run()
self.csum_support_stable()
pass
except Exception as e:
traceback.print_exc()
......
......@@ -7,7 +7,7 @@ import platform
import math
class TDTestCase:
updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 ,
updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 ,
"jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143,
"wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143,
"maxTablesPerVnode":2 ,"minTablesPerVnode":2,"tableIncStepPerVnode":2 }
......@@ -24,7 +24,7 @@ class TDTestCase:
stddev_sql = f"select stddev({col_name}) from {tbname};"
same_sql = f"select {col_name} from {tbname} where {col_name} is not null "
tdSql.query(same_sql)
pre_data = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_data.dtype == 'int32'):
......@@ -32,21 +32,21 @@ class TDTestCase:
pre_avg = np.sum(pre_data)/len(pre_data)
# Calculate variance
stddev_result = 0
stddev_result = 0
for num in tdSql.queryResult:
stddev_result += (num-pre_avg)*(num-pre_avg)/len(tdSql.queryResult)
stddev_result = math.sqrt(stddev_result)
tdSql.query(stddev_sql)
if -0.0001 < tdSql.queryResult[0][0]-stddev_result < 0.0001:
tdLog.info(" sql:%s; row:0 col:0 data:%d , expect:%d"%(stddev_sql,tdSql.queryResult[0][0],stddev_result))
else:
tdLog.exit(" sql:%s; row:0 col:0 data:%d , expect:%d"%(stddev_sql,tdSql.queryResult[0][0],stddev_result))
def prepare_datas_of_distribute(self):
# prepate datas for 20 tables distributed at different vgroups
tdSql.execute("create database if not exists testdb keep 3650 duration 1000 vgroups 5")
tdSql.execute(" use testdb ")
......@@ -117,17 +117,17 @@ class TDTestCase:
vgroups = tdSql.queryResult
vnode_tables={}
for vgroup_id in vgroups:
vnode_tables[vgroup_id[0]]=[]
# check sub_table of per vnode ,make sure sub_table has been distributed
tdSql.query("show tables like 'ct%'")
table_names = tdSql.queryResult
tablenames = []
for table_name in table_names:
vnode_tables[table_name[6]].append(table_name[0])
vnode_tables[table_name[6]].append(table_name[0])
self.vnode_disbutes = vnode_tables
count = 0
......@@ -138,14 +138,14 @@ class TDTestCase:
tdLog.exit(" the datas of all not satisfy sub_table has been distributed ")
def check_stddev_distribute_diff_vnode(self,col_name):
vgroup_ids = []
for k ,v in self.vnode_disbutes.items():
if len(v)>=2:
vgroup_ids.append(k)
distribute_tbnames = []
for vgroup_id in vgroup_ids:
vnode_tables = self.vnode_disbutes[vgroup_id]
distribute_tbnames.append(random.sample(vnode_tables,1)[0])
......@@ -154,7 +154,7 @@ class TDTestCase:
tbname_ins += "'%s' ,"%tbname
tbname_filters = tbname_ins[:-1]
stddev_sql = f"select stddev({col_name}) from stb1 where tbname in ({tbname_filters});"
same_sql = f"select {col_name} from stb1 where tbname in ({tbname_filters}) and {col_name} is not null "
......@@ -166,7 +166,7 @@ class TDTestCase:
pre_avg = np.sum(pre_data)/len(pre_data)
# Calculate variance
stddev_result = 0
stddev_result = 0
for num in tdSql.queryResult:
stddev_result += (num-pre_avg)*(num-pre_avg)/len(tdSql.queryResult)
......@@ -177,8 +177,8 @@ class TDTestCase:
def check_stddev_status(self):
# check max function work status
# check max function work status
tdSql.query("show tables like 'ct%'")
table_names = tdSql.queryResult
tablenames = []
......@@ -187,31 +187,31 @@ class TDTestCase:
tdSql.query("desc stb1")
col_names = tdSql.queryResult
colnames = []
for col_name in col_names:
if col_name[1] in ["INT" ,"BIGINT" ,"SMALLINT" ,"TINYINT" , "FLOAT" ,"DOUBLE"]:
colnames.append(col_name[0])
for tablename in tablenames:
for colname in colnames:
if colname.startswith("c"):
self.check_stddev_functions(tablename,colname)
else:
# self.check_stddev_functions(tablename,colname)
# self.check_stddev_functions(tablename,colname)
pass
# check max function for different vnode
# check max function for different vnode
for colname in colnames:
if colname.startswith("c"):
self.check_stddev_distribute_diff_vnode(colname)
else:
# self.check_stddev_distribute_diff_vnode(colname) # bug for tag
# self.check_stddev_distribute_diff_vnode(colname) # bug for tag
pass
def distribute_agg_query(self):
# basic filter
tdSql.query(" select stddev(c1) from stb1 ")
......@@ -235,7 +235,7 @@ class TDTestCase:
tdSql.query("select stddev(c1) from stb1 where t1> 4 partition by tbname")
tdSql.checkRows(15)
# union all
# union all
tdSql.query("select stddev(c1) from stb1 union all select stddev(c1) from stb1 ")
tdSql.checkRows(2)
tdSql.checkData(0,0,6.694663959)
......@@ -244,7 +244,7 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.checkData(0,0,0.000000000)
# join
# join
tdSql.execute(" create database if not exists db ")
tdSql.execute(" use db ")
......@@ -252,7 +252,7 @@ class TDTestCase:
tdSql.execute(" create table tb1 using st tags(1) ")
tdSql.execute(" create table tb2 using st tags(2) ")
for i in range(10):
ts = i*10 + self.ts
tdSql.execute(f" insert into tb1 values({ts},{i},{i}.0)")
......@@ -263,7 +263,7 @@ class TDTestCase:
tdSql.checkData(0,0,2.872281323)
tdSql.checkData(0,1,2.872281323)
# group by
# group by
tdSql.execute(" use testdb ")
# partition by tbname or partition by tag
......@@ -295,7 +295,7 @@ class TDTestCase:
self.check_stddev_status()
self.distribute_agg_query()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
......@@ -355,9 +355,63 @@ class TDTestCase:
tdSql.execute(f"create table tt{i} using stb2 tags({i})")
pass
def diff_support_stable(self):
tdSql.query(" select diff(1) from stb1 ")
tdSql.checkRows(229)
tdSql.checkData(0,0,0)
tdSql.query("select diff(c1) from stb1 partition by tbname ")
tdSql.checkRows(199)
# tdSql.query("select diff(st1) from stb1 partition by tbname")
# tdSql.checkRows(229)
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
# # bug need fix
# tdSql.query("select diff(st1+c1) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(19)
# tdSql.error("select diff(st1+c1) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
# bug need fix
# tdSql.query("select tbname , diff(c1) from stb1 partition by tbname")
# tdSql.checkRows(199)
# tdSql.query("select tbname , diff(st1) from stb1 partition by tbname")
# tdSql.checkRows(199)
# tdSql.query("select tbname , diff(st1) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(19)
# partition by tags
# tdSql.query("select st1 , diff(c1) from stb1 partition by st1")
# tdSql.checkRows(199)
# tdSql.query("select diff(c1) from stb1 partition by st1")
# tdSql.checkRows(199)
# tdSql.query("select st1 , diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# partition by col
# tdSql.query("select c1 , diff(c1) from stb1 partition by c1")
# tdSql.checkRows(199)
# tdSql.query("select diff(c1) from stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select c1 , diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
def diff_test_run(self) :
tdLog.printNoPrefix("==========TD-10594==========")
tdLog.printNoPrefix("==========run test case for diff function==========")
tbnum = 10
nowtime = int(round(time.time() * 1000))
per_table_rows = 10
......@@ -422,6 +476,7 @@ class TDTestCase:
try:
# run in develop branch
self.diff_test_run()
self.diff_support_stable()
pass
except Exception as e:
traceback.print_exc()
......
......@@ -673,11 +673,65 @@ class TDTestCase:
tdSql.query("select mavg(abs(c1),1) from t1")
tdSql.checkRows(4)
def mavg_support_stable(self):
tdSql.query(" select mavg(1,3) from stb1 ")
tdSql.checkRows(68)
tdSql.checkData(0,0,1.000000000)
tdSql.query("select mavg(c1,3) from stb1 partition by tbname ")
tdSql.checkRows(38)
# tdSql.query("select mavg(st1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
# # bug need fix
# tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(2)
# tdSql.error("select mavg(st1+c1,3) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
# bug need fix
# tdSql.query("select tbname , mavg(c1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(2)
# partition by tags
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1")
# tdSql.checkRows(38)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1")
# tdSql.checkRows(38)
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# partition by col
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by c1")
# tdSql.checkRows(38)
# tdSql.query("select mavg(c1 ,3) from stb1 partition by c1")
# tdSql.checkRows(38)
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
def run(self):
import traceback
try:
# run in develop branch
self.mavg_test_run()
self.mavg_support_stable()
pass
except Exception as e:
traceback.print_exc()
......
......@@ -478,6 +478,9 @@ class TDTestCase:
self.test_case3()
# tdLog.debug(" LIMIT test_case3 ............ [OK]")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
return
#
......
......@@ -798,6 +798,36 @@ class TDTestCase:
tdSql.query("select sample(c1,100)+2 from ct1")
tdSql.query("select abs(sample(c1,100)) from ct1")
# support stable and tbname
tdSql.query("select tbname ,sample(c1,2) from stb1 partition by tbname order by tbname")
tdSql.checkRows(4)
tdSql.checkData(0,0,'ct1')
tdSql.checkData(3,0,'ct4')
# # bug need fix
# tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(4)
# tdSql.checkData(0,0,'ct1')
# tdSql.checkData(3,0,'ct4')
# tdSql.checkData(0,2,1)
# tdSql.checkData(3,2,4)
tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by t1 order by t1 ")
tdSql.checkRows(4)
tdSql.checkData(0,0,'ct1')
tdSql.checkData(3,0,'ct4')
tdSql.checkData(0,2,1)
tdSql.checkData(3,2,4)
# bug need fix
# tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by c1 order by c1 ")
# tdSql.checkRows(21)
# bug need fix
# tdSql.query(" select sample(c1,2) from stb1 partition by c1 ")
# tdSql.checkRows(21)
def sample_test_run(self) :
tdLog.printNoPrefix("==========support sample function==========")
tbnum = 10
......
......@@ -18,6 +18,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.ts = 1420041600000 # 2015-01-01 00:00:00 this is begin time for first record
def prepare_datas(self):
tdSql.execute(
......@@ -344,6 +345,8 @@ class TDTestCase:
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.error("select stateduration(c1,'GT',1,10m) from t1")
tdSql.error("select stateduration(c1,'GT',1,10d) from t1")
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
tdSql.checkData(10,0,63072035)
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
......@@ -355,6 +358,58 @@ class TDTestCase:
tdSql.query("select stateduration(c1,'GT',1,1w) from t1")
tdSql.checkData(10,0,int(63072035/60/7/24/60))
def query_precision(self):
def generate_data(precision="ms"):
tdSql.execute("create database if not exists db_%s precision '%s';" %(precision, precision))
tdSql.execute("use db_%s;" %precision)
tdSql.execute("create stable db_%s.st (ts timestamp , id int) tags(ind int);"%precision)
tdSql.execute("create table db_%s.tb1 using st tags(1);"%precision)
tdSql.execute("create table db_%s.tb2 using st tags(2);"%precision)
if precision == "ms":
start_ts = self.ts
step = 10000
elif precision == "us":
start_ts = self.ts*1000
step = 10000000
elif precision == "ns":
start_ts = self.ts*1000000
step = 10000000000
else:
pass
for i in range(10):
sql1 = "insert into db_%s.tb1 values (%d,%d)"%(precision ,start_ts+i*step,i)
sql2 = "insert into db_%s.tb1 values (%d,%d)"%(precision, start_ts+i*step,i)
tdSql.execute(sql1)
tdSql.execute(sql2)
time_units = ["1s","1a","1u","1b"]
precision_list = ["ms","us","ns"]
for pres in precision_list:
generate_data(pres)
for index,unit in enumerate(time_units):
if pres == "ms":
if unit in ["1u","1b"]:
tdSql.error("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
pass
else:
tdSql.query("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
elif pres == "us" and unit in ["1b"]:
if unit in ["1b"]:
tdSql.error("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
pass
else:
tdSql.query("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
else:
tdSql.query("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
basic_result = 70
tdSql.checkData(9,0,basic_result*pow(1000,index))
def check_boundary_values(self):
......@@ -420,6 +475,8 @@ class TDTestCase:
tdLog.printNoPrefix("==========step6: statecount unit time test ============")
self.check_unit_time()
self.query_precision()
def stop(self):
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Subproject commit 1163c0f60aa65d6cc58283247c8bf8c56ba43b92
Subproject commit 50b68d85f7cbaf7a9adfa4082e88ca758770f75e
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册