提交 05c8328d 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

......@@ -156,13 +156,15 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
strtolower(sTableName, point->stableName);
strncpy(sTableName, point->stableName, strlen(point->stableName));
//strtolower(sTableName, point->stableName);
taosStringBuilderAppendString(&sb, sTableName);
for (int j = 0; j < point->tagNum; ++j) {
taosStringBuilderAppendChar(&sb, ',');
TAOS_SML_KV* tagKv = point->tags + j;
char tagName[TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
strtolower(tagName, tagKv->key);
strncpy(tagName, tagKv->key, strlen(tagKv->key));
//strtolower(tagName, tagKv->key);
taosStringBuilderAppendString(&sb, tagName);
taosStringBuilderAppendChar(&sb, '=');
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
......@@ -261,10 +263,10 @@ static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint,
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
char fieldNameLowerCase[TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
strtolower(fieldNameLowerCase, pointColField->name);
char fieldName[TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
strcpy(fieldName, pointColField->name);
size_t* pDbIndex = taosHashGet(dbAttrHash, fieldNameLowerCase, strlen(fieldNameLowerCase));
size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName));
if (pDbIndex) {
SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
......@@ -297,7 +299,7 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash
*actionNeeded = true;
}
if (*actionNeeded) {
tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldNameLowerCase,
tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName,
action->action);
}
return 0;
......@@ -536,11 +538,8 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl
tscDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName);
char tableNameLowerCase[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
strtolower(tableNameLowerCase, tableName);
char sql[256];
snprintf(sql, 256, "describe %s", tableNameLowerCase);
snprintf(sql, 256, "describe %s", tableName);
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
......@@ -561,8 +560,10 @@ static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTabl
pSql->fp = NULL;
registerSqlObj(pSql);
SStrToken tableToken = {.z = tableNameLowerCase, .n = (uint32_t)strlen(tableNameLowerCase), .type = TK_ID};
tGetToken(tableNameLowerCase, &tableToken.type);
char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
memcpy(tableNameBuf, tableName, strlen(tableName));
SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID};
tGetToken(tableNameBuf, &tableToken.type);
bool dbIncluded = false;
// Check if the table name available or not
if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
......@@ -1870,24 +1871,14 @@ static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index, SSmlLine
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
char *val = NULL;
char *cur = key;
char keyLower[TSDB_COL_NAME_LEN];
size_t keyLen = 0;
while(*cur != '\0') {
keyLower[keyLen] = tolower(*cur);
keyLen++;
cur++;
}
keyLower[keyLen] = '\0';
val = taosHashGet(pHash, keyLower, keyLen);
val = taosHashGet(pHash, key, strlen(key));
if (val) {
tscError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, keyLower);
tscError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
return true;
}
uint8_t dummy_val = 0;
taosHashPut(pHash, keyLower, strlen(key), &dummy_val, sizeof(uint8_t));
taosHashPut(pHash, key, strlen(key), &dummy_val, sizeof(uint8_t));
return false;
}
......@@ -1925,7 +1916,6 @@ static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash
pKV->key = calloc(len + TS_ESCAPE_CHAR_SIZE + 1, 1);
memcpy(pKV->key, key, len + 1);
strntolower_s(pKV->key, pKV->key, (int32_t)len);
addEscapeCharToString(pKV->key, len);
tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
*index = cur + 1;
......@@ -2053,7 +2043,7 @@ static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index
if (*cur == '\\') {
escapeSpecialCharacter(1, &cur);
}
pSml->stableName[len] = tolower(*cur);
pSml->stableName[len] = *cur;
cur++;
len++;
}
......@@ -2129,7 +2119,6 @@ static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
if (!isField && childTableNameLen != 0 && strcasecmp(pkv->key, childTableName) == 0) {
smlData->childTableName = malloc(pkv->length + TS_ESCAPE_CHAR_SIZE + 1);
memcpy(smlData->childTableName, pkv->value, pkv->length);
strntolower_s(smlData->childTableName, smlData->childTableName, (int32_t)pkv->length);
addEscapeCharToString(smlData->childTableName, (int32_t)pkv->length);
free(pkv->key);
free(pkv->value);
......
......@@ -65,7 +65,7 @@ static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index,
}
}
pSml->stableName[len] = tolower(*cur);
pSml->stableName[len] = *cur;
cur++;
len++;
......@@ -241,7 +241,6 @@ static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj
pKV->key = tcalloc(len + TS_ESCAPE_CHAR_SIZE + 1, 1);
memcpy(pKV->key, key, len + 1);
strntolower_s(pKV->key, pKV->key, (int32_t)len);
addEscapeCharToString(pKV->key, len);
//tscDebug("OTD:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
*index = cur + 1;
......@@ -327,7 +326,6 @@ static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
*childTableName = tcalloc(pkv->length + TS_ESCAPE_CHAR_SIZE + 1, 1);
memcpy(*childTableName, pkv->value, pkv->length);
(*childTableName)[pkv->length] = '\0';
strntolower_s(*childTableName, *childTableName, (int32_t)pkv->length);
addEscapeCharToString(*childTableName, pkv->length);
tfree(pkv->key);
tfree(pkv->value);
......@@ -515,7 +513,6 @@ static int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlL
*/
tstrncpy(pSml->stableName, metric->valuestring, stableLen + 1);
strntolower_s(pSml->stableName, pSml->stableName, (int32_t)stableLen);
addEscapeCharToString(pSml->stableName, (int32_t)stableLen);
return TSDB_CODE_SUCCESS;
......@@ -546,7 +543,6 @@ static int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesI
}
size_t typeLen = strlen(type->valuestring);
strntolower_s(type->valuestring, type->valuestring, (int32_t)typeLen);
if (typeLen == 1 && type->valuestring[0] == 's') {
//seconds
*tsVal = (int64_t)(*tsVal * 1e9);
......@@ -915,7 +911,6 @@ static int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs,
size_t idLen = strlen(id->valuestring);
*childTableName = tcalloc(idLen + TS_ESCAPE_CHAR_SIZE + 1, sizeof(char));
memcpy(*childTableName, id->valuestring, idLen);
strntolower_s(*childTableName, *childTableName, (int32_t)idLen);
addEscapeCharToString(*childTableName, (int32_t)idLen);
//check duplicate IDs
......@@ -954,7 +949,6 @@ static int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs,
}
pkv->key = tcalloc(keyLen + TS_ESCAPE_CHAR_SIZE + 1, sizeof(char));
strncpy(pkv->key, tag->string, keyLen);
strntolower_s(pkv->key, pkv->key, (int32_t)keyLen);
addEscapeCharToString(pkv->key, (int32_t)keyLen);
//value
ret = parseValueFromJSON(tag, pkv, info);
......
......@@ -1623,7 +1623,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pRes->qId = 0;
pRes->numOfRows = 0;
strtolower(pSql->sqlstr, sql);
strcpy(pSql->sqlstr, sql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
if (tscIsInsertData(pSql->sqlstr)) {
......
......@@ -6361,6 +6361,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
// the schema is located after the pMsg body, then followed by true tag value
char* d = pUpdateMsg->data;
SSchema* pTagCols = tscGetTableTagSchema(pTableMeta);
for (int i = 0; i < numOfTags; ++i) {
STColumn* pCol = (STColumn*) d;
pCol->colId = htons(pTagCols[i].colId);
......@@ -6415,6 +6416,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SStrToken name = {.type = TK_STRING, .z = pItem->pVar.pz, .n = pItem->pVar.nLen};
//handle Escape character backstick
bool inEscape = false;
if (name.z[0] == TS_ESCAPE_CHAR && name.z[name.n - 1] == TS_ESCAPE_CHAR) {
inEscape = true;
name.type = TK_ID;
}
if (getColumnIndexByName(&name, pQueryInfo, &columnIndex, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsg, msg17);
}
......@@ -6425,6 +6434,13 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
char name1[TSDB_COL_NAME_LEN] = {0};
tstrncpy(name1, pItem->pVar.pz, sizeof(name1));
int32_t nameLen = pItem->pVar.nLen;
if (inEscape) {
memmove(name1, name1 + 1, nameLen);
name1[nameLen - TS_ESCAPE_CHAR_SIZE] = '\0';
}
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_INT, name1, tDataTypes[TSDB_DATA_TYPE_INT].bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_CHANGE_COLUMN) {
......@@ -6440,11 +6456,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SStrToken name = {.type = TK_STRING, .z = pItem->name, .n = (uint32_t)strlen(pItem->name)};
//handle Escape character backstick
bool inEscape = false;
if (name.z[0] == TS_ESCAPE_CHAR && name.z[name.n - 1] == TS_ESCAPE_CHAR) {
memmove(name.z, name.z + 1, name.n);
name.z[name.n - TS_ESCAPE_CHAR_SIZE] = '\0';
name.n -= TS_ESCAPE_CHAR_SIZE;
inEscape = true;
name.type = TK_ID;
}
if (getColumnIndexByName(&name, pQueryInfo, &columnIndex, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
......@@ -6480,6 +6497,13 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (nLen >= TSDB_MAX_BYTES_PER_ROW) {
return invalidOperationMsg(pMsg, msg24);
}
if (inEscape) {
memmove(name.z, name.z + 1, name.n);
name.z[name.n - TS_ESCAPE_CHAR_SIZE] = '\0';
name.n -= TS_ESCAPE_CHAR_SIZE;
}
TAOS_FIELD f = tscCreateField(pColSchema->type, name.z, pItem->bytes);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
}else if (pAlterSQL->type == TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN) {
......
......@@ -148,6 +148,7 @@ extern char tsMqttTopic[];
// monitor
extern int8_t tsEnableMonitorModule;
extern int8_t tsMonitorReplica;
extern char tsMonitorDbName[];
extern char tsInternalPass[];
extern int32_t tsMonitorInterval;
......
......@@ -39,7 +39,7 @@ typedef struct tVariant {
bool tVariantIsValid(tVariant *pVar);
void tVariantCreate(tVariant *pVar, SStrToken *token);
void tVariantCreate(tVariant *pVar, SStrToken *token, bool needRmquoteEscape);
void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type);
......
......@@ -193,6 +193,7 @@ char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/test"; // #
// monitor
int8_t tsEnableMonitorModule = 1;
int8_t tsMonitorReplica = 1;
char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
char tsInternalPass[] = "secretkey";
int32_t tsMonitorInterval = 30; // seconds
......@@ -669,6 +670,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_SECOND;
taosInitConfigOption(cfg);
cfg.option = "monitorReplica";
cfg.ptr = &tsMonitorReplica;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1;
cfg.maxValue = 3;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "offlineThreshold";
cfg.ptr = &tsOfflineThreshold;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......
......@@ -30,7 +30,7 @@
assert(0); \
} while (0)
void tVariantCreate(tVariant *pVar, SStrToken *token) {
void tVariantCreate(tVariant *pVar, SStrToken *token, bool needRmquoteEscape) {
int32_t ret = 0;
int32_t type = token->type;
......@@ -81,7 +81,7 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
case TSDB_DATA_TYPE_BINARY: {
pVar->pz = strndup(token->z, token->n);
pVar->nLen = strRmquoteEscape(pVar->pz, token->n);
pVar->nLen = needRmquoteEscape ? strRmquoteEscape(pVar->pz, token->n) : token->n;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
......
......@@ -29,8 +29,8 @@ extern "C" {
extern int32_t dDebugFlag;
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }}
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); dnodeIncDnodeError(); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); dnodeIncDnodeError(); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
......
......@@ -54,6 +54,7 @@ void moduleStop() {}
void *tsDnodeTmr = NULL;
static SRunStatus tsRunStatus = TSDB_RUN_STATUS_STOPPED;
static int64_t tsDnodeErrors = 0;
static int32_t dnodeInitStorage();
static void dnodeCleanupStorage();
......@@ -225,6 +226,14 @@ static void dnodeSetRunStatus(SRunStatus status) {
tsRunStatus = status;
}
int64_t dnodeGetDnodeError() {
return tsDnodeErrors;
}
void dnodeIncDnodeError() {
atomic_add_fetch_64(&tsDnodeErrors, 1);
}
static void dnodeCheckDataDirOpenned(char *dir) {
char filepath[256] = {0};
sprintf(filepath, "%s/.running", dir);
......
......@@ -28,8 +28,8 @@ static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void * tsShellRpc = NULL;
static int32_t tsQueryReqNum = 0;
static int32_t tsSubmitReqNum = 0;
static int64_t tsQueryReqNum = 0;
static int64_t tsSubmitReqNum = 0;
int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
......@@ -136,9 +136,9 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&tsQueryReqNum, 1);
atomic_fetch_add_64(&tsQueryReqNum, 1);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_32(&tsSubmitReqNum, 1);
atomic_fetch_add_64(&tsSubmitReqNum, 1);
} else {}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
......@@ -237,15 +237,31 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
}
}
SStatisInfo dnodeGetStatisInfo() {
SStatisInfo info = {0};
SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_RUN_STATUS_RUNING) {
#ifdef HTTP_EMBEDDED
info.httpReqNum = httpGetReqCount();
#endif
info.queryReqNum = atomic_exchange_32(&tsQueryReqNum, 0);
info.submitReqNum = atomic_exchange_32(&tsSubmitReqNum, 0);
info.queryReqNum = atomic_exchange_64(&tsQueryReqNum, 0);
info.submitReqNum = atomic_exchange_64(&tsSubmitReqNum, 0);
}
return info;
}
int32_t dnodeGetHttpStatusInfo(int32_t index) {
int32_t httpStatus = 0;
#ifdef HTTP_EMBEDDED
httpStatus = httpGetStatusCodeCount(index);
#endif
return httpStatus;
}
void dnodeClearHttpStatusInfo() {
#ifdef HTTP_EMBEDDED
for (int i = 0; i < MAX_HTTP_STATUS_CODE_NUM; ++i) {
httpClearStatusCodeCount(i);
}
#endif
}
......@@ -23,13 +23,16 @@ extern "C" {
#include "trpc.h"
#include "taosmsg.h"
#define MAX_HTTP_STATUS_CODE_NUM 63
typedef struct {
int32_t queryReqNum;
int32_t submitReqNum;
int32_t httpReqNum;
} SStatisInfo;
int64_t queryReqNum;
int64_t submitReqNum;
int64_t httpReqNum;
} SDnodeStatisInfo;
SStatisInfo dnodeGetStatisInfo();
SDnodeStatisInfo dnodeGetStatisInfo();
int32_t dnodeGetHttpStatusInfo(int32_t index);
void dnodeClearHttpStatusInfo();
bool dnodeIsFirstDeploy();
bool dnodeIsMasterEp(char *ep);
......@@ -37,6 +40,8 @@ void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
int32_t dnodeGetDnodeId();
void dnodeGetClusterId(char *clusterId);
int64_t dnodeGetDnodeError();
void dnodeIncDnodeError();
void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr);
......
......@@ -22,7 +22,9 @@ extern "C" {
#include <stdint.h>
int32_t httpGetReqCount();
int64_t httpGetReqCount();
int32_t httpGetStatusCodeCount(int index);
int32_t httpClearStatusCodeCount(int index);
int32_t httpInitSystem();
int32_t httpStartSystem();
void httpStopSystem();
......
......@@ -22,6 +22,17 @@ extern "C" {
#include <stdint.h>
#define monSaveLogs(level, ...) { \
monSaveLog(level, __VA_ARGS__); \
monSaveDnodeLog(level, __VA_ARGS__); \
}
typedef struct {
const char * name;
int32_t code;
int32_t index;
} SMonHttpStatus;
typedef struct {
char * acctId;
int64_t currentPointsPerSecond;
......@@ -53,9 +64,16 @@ void monStopSystem();
void monCleanupSystem();
void monSaveAcctLog(SAcctMonitorObj *pMonObj);
void monSaveLog(int32_t level, const char *const format, ...);
void monSaveDnodeLog(int32_t level, const char *const format, ...);
void monExecuteSQL(char *sql);
typedef void (*MonExecuteSQLCbFP)(void *param, TAOS_RES *, int code);
void monExecuteSQLWithResultCallback(char *sql, MonExecuteSQLCbFP callback, void* param);
void monIncQueryReqCnt();
void monIncSubmitReqCnt();
int32_t monFetchQueryReqCnt();
int32_t monFetchSubmitReqCnt();
SMonHttpStatus *monGetHttpStatusHashTableEntry(int32_t code);
#ifdef __cplusplus
}
#endif
......
......@@ -22,6 +22,12 @@ extern "C" {
#include "trpc.h"
#include "twal.h"
typedef struct {
int64_t submitReqSucNum;
int64_t submitRowNum;
int64_t submitRowSucNum;
} SVnodeStatisInfo;
typedef struct {
int32_t len;
void * rsp;
......@@ -62,7 +68,7 @@ int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeCompact(int32_t vgId);
int32_t vnodeCompact(int32_t vgId);
// vnodeMgmt
int32_t vnodeInitMgmt();
......@@ -80,6 +86,8 @@ int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcM
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
SVnodeStatisInfo vnodeGetStatisInfo();
// vnodeSync
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
......
......@@ -41,9 +41,9 @@ extern int32_t sdbDebugFlag;
#define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define mLError(...) { monSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) }
#define mLWarn(...) { monSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
#define mLInfo(...) { monSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
#define mLError(...) { monSaveLogs(2, __VA_ARGS__); mError(__VA_ARGS__) }
#define mLWarn(...) { monSaveLogs(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
#define mLInfo(...) { monSaveLogs(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
#ifdef __cplusplus
}
......
......@@ -1177,8 +1177,8 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
int32_t cols = 0;
SUserObj *pUser = mnodeGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) {
if (strcmp(pUser->pAcct->user, TSDB_DEFAULT_USER) != 0 ) {
mnodeDecUserRef(pUser);
return TSDB_CODE_MND_NO_RIGHTS;
}
......@@ -1256,10 +1256,10 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
STR_TO_VARSTR(pWrite, syncRole[pVgid->role]);
cols++;
numOfRows++;
}
}
if (numOfRows >= rows) {
mnodeDecVgroupRef(pVgroup);
break;
}
......
......@@ -166,7 +166,7 @@ static void mnodeCancelGetNextConn(void *pIter) {
static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mnodeGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
if (strcmp(pUser->pAcct->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
......@@ -322,7 +322,7 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mnodeGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
if (strcmp(pUser->pAcct->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
......
......@@ -30,10 +30,11 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize);
int32_t taosGetCpuCores();
void taosGetSystemInfo();
bool taosReadProcIO(int64_t* rchars, int64_t* wchars);
bool taosGetProcIO(float *readKB, float *writeKB);
bool taosReadProcIO(int64_t* rchars, int64_t* wchars, int64_t* rbytes, int64_t* wbytes);
bool taosGetProcIO(float *rcharKB, float *wcharKB, float *rbyteKB, float* wbyteKB);
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes);
bool taosGetBandSpeed(float *bandSpeedKb);
bool taosGetNetworkIO(float *netInKb, float *netOutKb);
void taosGetDisk();
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) ;
bool taosGetProcMemory(float *memoryUsedMB) ;
......
......@@ -191,15 +191,19 @@ void taosGetSystemInfo() {
taosGetSystemLocale();
}
bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
bool taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *rbytes, int64_t *wbytes) {
if (rchars) *rchars = 0;
if (wchars) *wchars = 0;
if (rbytes) *rbytes = 0;
if (wbytes) *wbytes = 0;
return true;
}
bool taosGetProcIO(float *readKB, float *writeKB) {
*readKB = 0;
*writeKB = 0;
bool taosGetProcIO(float *rcharKB, float *wcharKB, float *rbyteKB, float *wbyteKB) {
*rcharKB = 0;
*wcharKB = 0;
*rbyteKB = 0;
*wbyteKB = 0;
return true;
}
......@@ -215,6 +219,12 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
return true;
}
bool taosGetNetworkIO(float *netInKb, float *netOutKb) {
*netInKb = 0;
*netOutKb = 0;
return true;
}
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
*sysCpuUsage = 0;
*procCpuUsage = 0;
......
......@@ -335,7 +335,9 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
}
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
*bytes = 0;
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
FILE *fp = fopen(tsSysNetFile, "r");
if (fp == NULL) {
uError("open file:%s failed", tsSysNetFile);
......@@ -350,7 +352,7 @@ bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
memset(line, 0, len);
int64_t o_rbytes = 0;
int64_t rpackts = 0;
int64_t rpackets = 0;
int64_t o_tbytes = 0;
int64_t tpackets = 0;
int64_t nouse1 = 0;
......@@ -376,10 +378,10 @@ bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
sscanf(line,
"%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64
" %" PRId64,
nouse0, &o_rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets);
nouse0, &o_rbytes, &rpackets, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets);
if (rbytes) *rbytes = o_rbytes;
if (tbytes) *tbytes = o_tbytes;
*bytes += (o_rbytes + o_tbytes);
if (bytes) *bytes += (o_rbytes + o_tbytes);
}
tfree(line);
......@@ -424,7 +426,46 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
return true;
}
bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
bool taosGetNetworkIO(float *netInKb, float *netOutKb) {
static int64_t lastBytesIn = 0, lastBytesOut = 0;
static time_t lastTimeIO = 0;
int64_t curBytesIn = 0, curBytesOut = 0;
time_t curTime = time(NULL);
if (!taosGetCardInfo(NULL, &curBytesIn, &curBytesOut)) {
return false;
}
if (lastTimeIO == 0 || lastBytesIn == 0 || lastBytesOut == 0) {
lastTimeIO = curTime;
lastBytesIn = curBytesIn; lastBytesOut = curBytesOut;
*netInKb = 0;
*netOutKb = 0;
return true;
}
if (lastTimeIO >= curTime || lastBytesIn > curBytesIn || lastBytesOut > curBytesOut) {
lastTimeIO = curTime;
lastBytesIn = curBytesIn; lastBytesOut = curBytesOut;
*netInKb = 0;
*netOutKb = 0;
return true;
}
double totalBytesIn = (double)(curBytesIn - lastBytesIn) / 1024 * 8; // Kb
*netInKb = (float)(totalBytesIn / (double)(curTime - lastTimeIO));
double totalBytesOut = (double)(curBytesOut - lastBytesOut) / 1024 * 8; // Kb
*netOutKb = (float)(totalBytesOut / (double)(curTime - lastTimeIO));
lastTimeIO = curTime;
lastBytesIn = curBytesIn;
lastBytesOut = curBytesOut;
return true;
}
bool taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *rbytes, int64_t *wbytes) {
FILE *fp = fopen(tsProcIOFile, "r");
if (fp == NULL) {
uError("open file:%s failed", tsProcIOFile);
......@@ -434,7 +475,7 @@ bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
ssize_t _bytes = 0;
size_t len;
char * line = NULL;
char tmp[10];
char tmp[15];
int readIndex = 0;
while (!feof(fp)) {
......@@ -450,16 +491,21 @@ bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
} else if (strstr(line, "wchar:") != NULL) {
sscanf(line, "%s %" PRId64, tmp, wchars);
readIndex++;
} else {
} else if (strstr(line, "read_bytes:") != NULL){
sscanf(line, "%s %" PRId64, tmp, rbytes);
readIndex++;
} else if (strstr(line, "write_bytes:") != NULL){
sscanf(line, "%s %" PRId64, tmp, wbytes);
readIndex++;
}
if (readIndex >= 2) break;
if (readIndex >= 4) break;
}
tfree(line);
fclose(fp);
if (readIndex < 2) {
if (readIndex < 4) {
uError("read file:%s failed", tsProcIOFile);
return false;
}
......@@ -467,30 +513,43 @@ bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
return true;
}
bool taosGetProcIO(float *readKB, float *writeKB) {
static int64_t lastReadbyte = -1;
static int64_t lastWritebyte = -1;
bool taosGetProcIO(float *rcharKB, float *wcharKB, float *rbyteKB, float *wbyteKB) {
static int64_t lastRchar = -1, lastRbyte = -1;
static int64_t lastWchar = -1, lastWbyte = -1;
static time_t lastTime = 0;
time_t curTime = time(NULL);
int64_t curReadbyte = 0;
int64_t curWritebyte = 0;
int64_t curRchar = 0, curRbyte = 0;
int64_t curWchar = 0, curWbyte = 0;
if (!taosReadProcIO(&curReadbyte, &curWritebyte)) {
if (!taosReadProcIO(&curRchar, &curWchar, &curRbyte, &curWbyte)) {
return false;
}
if (lastReadbyte == -1 || lastWritebyte == -1) {
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
if (lastTime == 0 || lastRchar == -1 || lastWchar == -1 || lastRbyte == -1 || lastWbyte == -1) {
lastTime = curTime;
lastRchar = curRchar;
lastWchar = curWchar;
lastRbyte = curRbyte;
lastWbyte = curWbyte;
return false;
}
*readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024);
*writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024);
if (*readKB < 0) *readKB = 0;
if (*writeKB < 0) *writeKB = 0;
*rcharKB = (float)((double)(curRchar - lastRchar) / 1024 / (double)(curTime - lastTime));
*wcharKB = (float)((double)(curWchar - lastWchar) / 1024 / (double)(curTime - lastTime));
if (*rcharKB < 0) *rcharKB = 0;
if (*wcharKB < 0) *wcharKB = 0;
*rbyteKB = (float)((double)(curRbyte - lastRbyte) / 1024 / (double)(curTime - lastTime));
*wbyteKB = (float)((double)(curWbyte - lastWbyte) / 1024 / (double)(curTime - lastTime));
if (*rbyteKB < 0) *rbyteKB = 0;
if (*wbyteKB < 0) *wbyteKB = 0;
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
lastRchar = curRchar;
lastWchar = curWchar;
lastRbyte = curRbyte;
lastWbyte = curWbyte;
lastTime = curTime;
return true;
}
......@@ -501,13 +560,13 @@ void taosGetSystemInfo() {
tsNumOfCores = taosGetCpuCores();
tsTotalMemoryMB = taosGetTotalMemory();
float tmp1, tmp2;
float tmp1, tmp2, tmp3, tmp4;
taosGetSysMemory(&tmp1);
taosGetProcMemory(&tmp2);
// taosGetDisk();
taosGetBandSpeed(&tmp1);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2, &tmp3, &tmp4);
taosGetSystemTimezone();
taosGetSystemLocale();
......
......@@ -169,40 +169,59 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
return true;
}
bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
bool taosGetNetworkIO(float *netInKb, float *netOutKb) {
*netInKb = 0;
*netOutKb = 0;
return true;
}
bool taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *rbytes, int64_t *wbytes) {
IO_COUNTERS io_counter;
if (GetProcessIoCounters(GetCurrentProcess(), &io_counter)) {
if (readbyte) *readbyte = io_counter.ReadTransferCount;
if (writebyte) *writebyte = io_counter.WriteTransferCount;
if (rchars) *rchars = io_counter.ReadTransferCount;
if (wchars) *wchars = io_counter.WriteTransferCount;
return true;
}
return false;
}
bool taosGetProcIO(float *readKB, float *writeKB) {
static int64_t lastReadbyte = -1;
static int64_t lastWritebyte = -1;
bool taosGetProcIO(float *rcharKB, float *wcharKB, float *rbyteKB, float *wbyteKB) {
static int64_t lastRchar = -1, lastRbyte = -1;
static int64_t lastWchar = -1, lastWbyte = -1;
static time_t lastTime = 0;
time_t curTime = time(NULL);
int64_t curReadbyte = 0;
int64_t curWritebyte = 0;
int64_t curRchar = 0, curRbyte = 0;
int64_t curWchar = 0, curWbyte = 0;
if (!taosReadProcIO(&curReadbyte, &curWritebyte)) {
if (!taosReadProcIO(&curRchar, &curWchar, &curRbyte, &curWbyte)) {
return false;
}
if (lastReadbyte == -1 || lastWritebyte == -1) {
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
if (lastTime == 0 || lastRchar == -1 || lastWchar == -1 || lastRbyte == -1 || lastWbyte == -1) {
lastTime = curTime;
lastRchar = curRchar;
lastWchar = curWchar;
lastRbyte = curRbyte;
lastWbyte = curWbyte;
return false;
}
*readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024);
*writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024);
if (*readKB < 0) *readKB = 0;
if (*writeKB < 0) *writeKB = 0;
*rcharKB = (float)((double)(curRchar - lastRchar) / 1024 / (double)(curTime - lastTime));
*wcharKB = (float)((double)(curWchar - lastWchar) / 1024 / (double)(curTime - lastTime));
if (*rcharKB < 0) *rcharKB = 0;
if (*wcharKB < 0) *wcharKB = 0;
*rbyteKB = (float)((double)(curRbyte - lastRbyte) / 1024 / (double)(curTime - lastTime));
*wbyteKB = (float)((double)(curWbyte - lastWbyte) / 1024 / (double)(curTime - lastTime));
if (*rbyteKB < 0) *rbyteKB = 0;
if (*wbyteKB < 0) *wbyteKB = 0;
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
lastRchar = curRchar;
lastWchar = curWchar;
lastRbyte = curRbyte;
lastWbyte = curWbyte;
lastTime = curTime;
return true;
}
......@@ -211,11 +230,11 @@ void taosGetSystemInfo() {
tsNumOfCores = taosGetCpuCores();
tsTotalMemoryMB = taosGetTotalMemory();
float tmp1, tmp2;
float tmp1, tmp2, tmp3, tmp4;
// taosGetDisk();
taosGetBandSpeed(&tmp1);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2, &tmp3, &tmp4);
taosGetSystemTimezone();
taosGetSystemLocale();
......
......@@ -42,6 +42,7 @@
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_PASSWORD_LEN TSDB_UNI_LEN
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + HTTP_PASSWORD_LEN)
#define HTTP_STATUS_CODE_NUM 63
typedef enum HttpReqType {
HTTP_REQTYPE_OTHERS = 0,
......@@ -187,8 +188,9 @@ typedef struct HttpServer {
SOCKET fd;
int32_t numOfThreads;
int32_t methodScannerLen;
int32_t requestNum;
int64_t requestNum;
int32_t status;
int32_t statusCodeErrs[HTTP_STATUS_CODE_NUM];
pthread_t thread;
HttpThread * pThreads;
void * contextCache;
......
......@@ -123,9 +123,9 @@ bool metricsProcessRequest(HttpContext* pContext) {
}
{
int64_t rchars = 0;
int64_t wchars = 0;
bool succeeded = taosReadProcIO(&rchars, &wchars);
int64_t rchars = 0, rbytes = 0;
int64_t wchars = 0, wbytes = 0;
bool succeeded = taosReadProcIO(&rchars, &wchars, &rbytes, &wbytes);
if (!succeeded) {
httpError("failed to get io info");
} else {
......@@ -164,7 +164,7 @@ bool metricsProcessRequest(HttpContext* pContext) {
}
{
SStatisInfo info = dnodeGetStatisInfo();
SDnodeStatisInfo info = dnodeGetStatisInfo();
{
char* keyReqHttp = "req_http";
char* keyReqSelect = "req_select";
......@@ -181,4 +181,4 @@ bool metricsProcessRequest(HttpContext* pContext) {
pContext->reqType = HTTP_REQTYPE_OTHERS;
httpFreeJsonBuf(pContext);
return false;
}
\ No newline at end of file
}
......@@ -21,6 +21,7 @@
#include "httpResp.h"
#include "httpJson.h"
#include "httpContext.h"
#include "monitor.h"
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};
......@@ -153,6 +154,10 @@ void httpSendErrorResp(HttpContext *pContext, int32_t errNo) {
httpCode = pContext->parser->httpCode;
}
HttpServer *pServer = &tsHttpServer;
SMonHttpStatus *httpStatus = monGetHttpStatusHashTableEntry(httpCode);
pServer->statusCodeErrs[httpStatus->index] += 1;
pContext->error = true;
char *httpCodeStr = httpGetStatusDesc(httpCode);
......
......@@ -190,7 +190,7 @@ static void httpProcessHttpData(void *param) {
} else {
if (httpReadData(pContext)) {
(*(pThread->processData))(pContext);
atomic_fetch_add_32(&pServer->requestNum, 1);
atomic_fetch_add_64(&pServer->requestNum, 1);
}
}
}
......
......@@ -120,4 +120,10 @@ void httpCleanUpSystem() {
tsHttpServer.status = HTTP_SERVER_CLOSED;
}
int32_t httpGetReqCount() { return atomic_exchange_32(&tsHttpServer.requestNum, 0); }
int64_t httpGetReqCount() { return atomic_exchange_64(&tsHttpServer.requestNum, 0); }
int32_t httpGetStatusCodeCount(int index) {
return atomic_load_32(&tsHttpServer.statusCodeErrs[index]);
}
int32_t httpClearStatusCodeCount(int index) {
return atomic_exchange_32(&tsHttpServer.statusCodeErrs[index], 0);
}
此差异已折叠。
......@@ -281,7 +281,7 @@ typedef struct tSqlExprItem {
SArray *tVariantListAppend(SArray *pList, tVariant *pVar, uint8_t sortOrder);
SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index);
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder);
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder, bool needRmquoteEscape);
SRelationInfo *setTableNameList(SRelationInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias);
void *destroyRelationInfo(SRelationInfo* pFromInfo);
......
......@@ -253,7 +253,7 @@ acct_optr(Y) ::= pps(C) tseries(D) storage(P) streams(F) qtime(Q) dbs(E) users(K
intitemlist(A) ::= intitemlist(X) COMMA intitem(Y). { A = tVariantListAppend(X, &Y, -1); }
intitemlist(A) ::= intitem(X). { A = tVariantListAppend(NULL, &X, -1); }
intitem(A) ::= INTEGER(X). { toTSDBType(X.type); tVariantCreate(&A, &X); }
intitem(A) ::= INTEGER(X). { toTSDBType(X.type); tVariantCreate(&A, &X, true); }
%type keep {SArray*}
%destructor keep {taosArrayDestroy($$);}
......@@ -438,39 +438,39 @@ column(A) ::= ids(X) typename(Y). {
tagitemlist(A) ::= tagitemlist(X) COMMA tagitem(Y). { A = tVariantListAppend(X, &Y, -1); }
tagitemlist(A) ::= tagitem(X). { A = tVariantListAppend(NULL, &X, -1); }
tagitem(A) ::= INTEGER(X). { toTSDBType(X.type); tVariantCreate(&A, &X); }
tagitem(A) ::= FLOAT(X). { toTSDBType(X.type); tVariantCreate(&A, &X); }
tagitem(A) ::= STRING(X). { toTSDBType(X.type); tVariantCreate(&A, &X); }
tagitem(A) ::= BOOL(X). { toTSDBType(X.type); tVariantCreate(&A, &X); }
tagitem(A) ::= NULL(X). { X.type = 0; tVariantCreate(&A, &X); }
tagitem(A) ::= NOW(X). { X.type = TSDB_DATA_TYPE_TIMESTAMP; tVariantCreate(&A, &X);}
tagitem(A) ::= INTEGER(X). { toTSDBType(X.type); tVariantCreate(&A, &X, true); }
tagitem(A) ::= FLOAT(X). { toTSDBType(X.type); tVariantCreate(&A, &X, true); }
tagitem(A) ::= STRING(X). { toTSDBType(X.type); tVariantCreate(&A, &X, true); }
tagitem(A) ::= BOOL(X). { toTSDBType(X.type); tVariantCreate(&A, &X, true); }
tagitem(A) ::= NULL(X). { X.type = 0; tVariantCreate(&A, &X, true); }
tagitem(A) ::= NOW(X). { X.type = TSDB_DATA_TYPE_TIMESTAMP; tVariantCreate(&A, &X, true);}
tagitem(A) ::= MINUS(X) INTEGER(Y).{
X.n += Y.n;
X.type = Y.type;
toTSDBType(X.type);
tVariantCreate(&A, &X);
tVariantCreate(&A, &X, true);
}
tagitem(A) ::= MINUS(X) FLOAT(Y). {
X.n += Y.n;
X.type = Y.type;
toTSDBType(X.type);
tVariantCreate(&A, &X);
tVariantCreate(&A, &X, true);
}
tagitem(A) ::= PLUS(X) INTEGER(Y). {
X.n += Y.n;
X.type = Y.type;
toTSDBType(X.type);
tVariantCreate(&A, &X);
tVariantCreate(&A, &X, true);
}
tagitem(A) ::= PLUS(X) FLOAT(Y). {
X.n += Y.n;
X.type = Y.type;
toTSDBType(X.type);
tVariantCreate(&A, &X);
tVariantCreate(&A, &X, true);
}
//////////////////////// The SELECT statement /////////////////////////////////
......@@ -609,7 +609,7 @@ fill_opt(N) ::= . { N = 0; }
fill_opt(N) ::= FILL LP ID(Y) COMMA tagitemlist(X) RP. {
tVariant A = {0};
toTSDBType(Y.type);
tVariantCreate(&A, &Y);
tVariantCreate(&A, &Y, true);
tVariantListInsert(X, &A, -1, 0);
N = X;
......@@ -617,7 +617,7 @@ fill_opt(N) ::= FILL LP ID(Y) COMMA tagitemlist(X) RP. {
fill_opt(N) ::= FILL LP ID(Y) RP. {
toTSDBType(Y.type);
N = tVariantListAppendToken(NULL, &Y, -1);
N = tVariantListAppendToken(NULL, &Y, -1, true);
}
%type sliding_opt {SStrToken}
......@@ -649,7 +649,7 @@ item(A) ::= ids(X) cpxName(Y). {
toTSDBType(X.type);
X.n += Y.n;
tVariantCreate(&A, &X);
tVariantCreate(&A, &X, true);
}
%type sortorder {int}
......@@ -796,7 +796,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
X.n += F.n;
toTSDBType(A.type);
SArray* K = tVariantListAppendToken(NULL, &A, -1);
SArray* K = tVariantListAppendToken(NULL, &A, -1, false);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -818,7 +818,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
X.n += Z.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -828,10 +828,10 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
toTSDBType(Z.type);
A = tVariantListAppendToken(A, &Z, -1);
A = tVariantListAppendToken(A, &Z, -1, true);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -841,7 +841,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
A = tVariantListAppend(A, &Z, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, -1);
......@@ -865,7 +865,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
X.n += F.n;
toTSDBType(A.type);
SArray* K = tVariantListAppendToken(NULL, &A, -1);
SArray* K = tVariantListAppendToken(NULL, &A, -1, true);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -887,7 +887,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
X.n += Z.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -897,10 +897,10 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
toTSDBType(Z.type);
A = tVariantListAppendToken(A, &Z, -1);
A = tVariantListAppendToken(A, &Z, -1, true);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
......@@ -910,7 +910,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SArray* A = tVariantListAppendToken(NULL, &Y, -1, true);
A = tVariantListAppend(A, &Z, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, TSDB_SUPER_TABLE);
......
......@@ -143,14 +143,14 @@ tSqlExpr *tSqlExprCreateIdValue(SSqlInfo* pInfo, SStrToken *pToken, int32_t optr
if (optrType == TK_NULL) {
if (pToken){
pToken->type = TSDB_DATA_TYPE_NULL;
tVariantCreate(&pSqlExpr->value, pToken);
tVariantCreate(&pSqlExpr->value, pToken, true);
}
pSqlExpr->tokenId = optrType;
pSqlExpr->type = SQL_NODE_VALUE;
} else if (optrType == TK_INTEGER || optrType == TK_STRING || optrType == TK_FLOAT || optrType == TK_BOOL) {
if (pToken) {
toTSDBType(pToken->type);
tVariantCreate(&pSqlExpr->value, pToken);
tVariantCreate(&pSqlExpr->value, pToken, true);
}
pSqlExpr->tokenId = optrType;
pSqlExpr->type = SQL_NODE_VALUE;
......@@ -203,7 +203,7 @@ tSqlExpr *tSqlExprCreateTimestamp(SStrToken *pToken, int32_t optrType) {
if (optrType == TK_INTEGER || optrType == TK_STRING) {
if (pToken) {
toTSDBType(pToken->type);
tVariantCreate(&pSqlExpr->value, pToken);
tVariantCreate(&pSqlExpr->value, pToken, true);
}
pSqlExpr->tokenId = optrType;
pSqlExpr->type = SQL_NODE_VALUE;
......@@ -559,14 +559,14 @@ void tSqlExprDestroy(tSqlExpr *pExpr) {
doDestroySqlExprNode(pExpr);
}
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pToken, uint8_t order) {
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pToken, uint8_t order, bool needRmquoteEscape) {
if (pList == NULL) {
pList = taosArrayInit(4, sizeof(tVariantListItem));
}
if (pToken) {
tVariantListItem item;
tVariantCreate(&item.pVar, pToken);
tVariantCreate(&item.pVar, pToken, needRmquoteEscape);
item.sortOrder = order;
taosArrayPush(pList, &item);
......
此差异已折叠。
......@@ -57,7 +57,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
STsdbRepo * pRepo = repo;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
int32_t affectedrows = 0;
int32_t affectedrows = 0, numOfRows = 0;
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
......@@ -73,9 +73,13 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
return -1;
}
numOfRows += pBlock->numOfRows;
}
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
if (pRsp != NULL) {
pRsp->affectedRows = htonl(affectedrows);
pRsp->numOfRows = htonl(numOfRows);
}
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0;
......
......@@ -27,6 +27,10 @@
#define MAX_QUEUED_MSG_NUM 100000
#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB
static int64_t tsSubmitReqSucNum = 0;
static int64_t tsSubmitRowNum = 0;
static int64_t tsSubmitRowSucNum = 0;
extern void * tsDnodeTmr;
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
......@@ -163,7 +167,16 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
pRsp = pRet->rsp;
}
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) {
code = terrno;
} else {
if (pRsp != NULL) atomic_fetch_add_64(&tsSubmitReqSucNum, 1);
}
if (pRsp) {
atomic_fetch_add_64(&tsSubmitRowNum, ntohl(pRsp->numOfRows));
atomic_fetch_add_64(&tsSubmitRowSucNum, ntohl(pRsp->affectedRows));
}
return code;
}
......@@ -425,3 +438,12 @@ void vnodeWaitWriteCompleted(SVnodeObj *pVnode) {
if (extraSleep)
taosMsleep(900);
}
SVnodeStatisInfo vnodeGetStatisInfo() {
SVnodeStatisInfo info = {0};
info.submitReqSucNum = atomic_exchange_64(&tsSubmitReqSucNum, 0);
info.submitRowNum = atomic_exchange_64(&tsSubmitRowNum, 0);
info.submitRowSucNum = atomic_exchange_64(&tsSubmitRowSucNum, 0);
return info;
}
......@@ -41,7 +41,7 @@ sql create dnode $hostname2
sleep 10000
sql show log.tables;
if $rows > 6 then
if $rows > 20 then
return -1
endi
......@@ -50,7 +50,7 @@ print ===>rows $rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows < 10 then
if $rows < 9 then
return -1
endi
......
......@@ -42,7 +42,7 @@ print dnode2 openVnodes $data2_2
if $data2_1 != 0 then
return -1
endi
if $data2_2 != 1 then
if $data2_2 != 2 then
return -1
endi
......@@ -56,7 +56,25 @@ print $data30
print $data40
print $data50
if $rows > 6 then
print *num of tables $rows
if $rows > 17 then
return -1
endi
sql show log.stables
print $data00
print $data10
print $data20
print $data30
print $data40
print $data50
print $data60
print *num of stables $rows
if $rows > 7 then
return -1
endi
......
......@@ -19,7 +19,7 @@ sleep 3000
sql show dnodes
print dnode1 openVnodes $data2_1
if $data2_1 > 2 then
if $data2_1 > 4 then
return -1
endi
......@@ -28,20 +28,20 @@ sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
$x = 0
show2:
show2:
$x = $x + 1
sleep 2000
if $x == 10 then
return -1
endi
sql show dnodes
print dnode1 openVnodes $data2_1
print dnode2 openVnodes $data2_2
if $data2_1 != 0 then
goto show2
endi
if $data2_2 > 2 then
if $data2_2 > 4 then
goto show2
endi
......@@ -55,7 +55,7 @@ print $data30
print $data40
print $data50
if $rows > 5 then
if $rows > 14 then
return -1
endi
......@@ -74,4 +74,4 @@ if $rows2 <= $rows1 then
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode2 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册