diff --git a/.gitignore b/.gitignore index 47eae4dc03e979cdfc23c71b98a44ee0c6b03812..67cc2929b45049b7bb7ccf377b02bdaad70c3315 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,27 @@ pysim/ # Doxygen Generated files html/ +/.vs +/CMakeFiles/3.10.2 +/CMakeCache.txt +/Makefile +/*.cmake +/deps +/src/cq/test/CMakeFiles/cqtest.dir/*.cmake +*.cmake +/src/cq/test/CMakeFiles/cqtest.dir/*.make +*.make +link.txt +*.internal +*.includecache +*.marks +Makefile +CMakeError.log +*.log +/CMakeFiles/CMakeRuleHashes.txt +/CMakeFiles/Makefile2 +/CMakeFiles/TargetDirectories.txt +/CMakeFiles/cmake.check_cache +/out/isenseconfig/WSL-Clang-Debug +/out/isenseconfig/WSL-GCC-Debug +/test/cfg diff --git a/CMakeSettings.json b/CMakeSettings.json new file mode 100644 index 0000000000000000000000000000000000000000..4b54f10f2f69fe2fa62e59003d58a4ef9c53a41b --- /dev/null +++ b/CMakeSettings.json @@ -0,0 +1,25 @@ +{ + "configurations": [ + { + "name": "WSL-GCC-Debug", + "generator": "Unix Makefiles", + "configurationType": "Debug", + "buildRoot": "${projectDir}\\build\\", + "installRoot": "${projectDir}\\out\\install\\${name}", + "cmakeExecutable": "/usr/bin/cmake", + "cmakeCommandArgs": "", + "buildCommandArgs": "", + "ctestCommandArgs": "", + "inheritEnvironments": [ "linux_x64" ], + "wslPath": "${defaultWSLPath}", + "addressSanitizerRuntimeFlags": "detect_leaks=0", + "variables": [ + { + "name": "CMAKE_INSTALL_PREFIX", + "value": "/mnt/d/TDengine/TDengine/build", + "type": "PATH" + } + ] + } + ] +} \ No newline at end of file diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index da8f3cd1e1971518d744cbe69ac05b187a6aa412..319772b60652d6e567e9aa70d06cbffb8167b218 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -103,6 +103,7 @@ extern int32_t tsOfflineThreshold; extern int32_t tsMgmtEqualVnodeNum; extern int32_t tsEnableHttpModule; +extern int32_t tsEnableMqttModule; extern int32_t tsEnableMonitorModule; extern int32_t tsRestRowLimit; @@ -147,6 +148,7 @@ extern int32_t jniDebugFlag; extern int32_t tmrDebugFlag; extern int32_t sdbDebugFlag; extern int32_t httpDebugFlag; +extern int32_t mqttDebugFlag; extern int32_t monitorDebugFlag; extern int32_t uDebugFlag; extern int32_t rpcDebugFlag; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 6b348b7fc7e7cf2aee7fb8210cb715f2bfb27ba9..250b79febe1886cda66209082121fbc5f1cbb213 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -120,6 +120,7 @@ int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsMgmtEqualVnodeNum = 4; int32_t tsEnableHttpModule = 1; +int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default int32_t tsEnableMonitorModule = 0; int32_t tsRestRowLimit = 10240; @@ -134,6 +135,7 @@ int32_t cDebugFlag = 135; int32_t jniDebugFlag = 131; int32_t odbcDebugFlag = 131; int32_t httpDebugFlag = 131; +int32_t mqttDebugFlag = 131; int32_t monitorDebugFlag = 131; int32_t qDebugFlag = 131; int32_t rpcDebugFlag = 135; @@ -212,6 +214,7 @@ void taosSetAllDebugFlag() { jniDebugFlag = debugFlag; odbcDebugFlag = debugFlag; httpDebugFlag = debugFlag; + mqttDebugFlag = debugFlag; monitorDebugFlag = debugFlag; rpcDebugFlag = debugFlag; uDebugFlag = debugFlag; @@ -890,6 +893,17 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + + cfg.option = "mqtt"; + cfg.ptr = &tsEnableMqttModule; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "monitor"; cfg.ptr = &tsEnableMonitorModule; cfg.valType = TAOS_CFG_VTYPE_INT32; @@ -1112,6 +1126,17 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "mqttDebugFlag"; + cfg.ptr = &mqttDebugFlag; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG; + cfg.minValue = 0; + cfg.maxValue = 255; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "monitorDebugFlag"; cfg.ptr = &monitorDebugFlag; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 29236ed0ff89a4bb2b803a4d924591ba7a05c291..3566f26abd944d1f170b2569e8058c08c0478807 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -29,6 +29,7 @@ void extractTableName(const char* tableId, char* name) { size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]); strncpy(name, &tableId[s1 + s2 + 2], TSDB_TABLE_NAME_LEN); + name[TSDB_TABLE_NAME_LEN] = 0; } char* extractDBName(const char* tableId, char* name) { diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 7935bb7ff5f8660e021818a8b4f4b1887ed71194..5cc3ce0159ca12e809f92857d71991104e410711 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "taosdef.h" #include "taosmsg.h" #include "tglobal.h" @@ -64,7 +65,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { SCqContext *pContext = calloc(sizeof(SCqContext), 1); - if (pContext == NULL) return NULL; + if (pContext == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } strcpy(pContext->user, pCfg->user); strcpy(pContext->pass, pCfg->pass); @@ -82,6 +86,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { void cqClose(void *handle) { SCqContext *pContext = handle; + if (handle == NULL) return; // stop all CQs cqStop(pContext); @@ -106,9 +111,9 @@ void cqClose(void *handle) { void cqStart(void *handle) { SCqContext *pContext = handle; - cTrace("vgId:%d, start all CQs", pContext->vgId); if (pContext->dbConn || pContext->master) return; + cTrace("vgId:%d, start all CQs", pContext->vgId); pthread_mutex_lock(&pContext->mutex); pContext->master = 1; diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index af2dc2d777bb6d294beecf3b3fa2210ab7230571..de6e15e6b9fa8546ad232c16c8ac2c9472806cb4 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) AUX_SOURCE_DIRECTORY(src SRC) ADD_EXECUTABLE(taosd ${SRC}) - TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal vnode cJson lz4) + TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4) IF (TD_ACCOUNT) TARGET_LINK_LIBRARIES(taosd account) diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 2f693c61fbf5df441b57a130f7beab1cc77cd81e..68fe9869899edc42a1f6c251cce9749a800f43f6 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -37,6 +37,7 @@ static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; int32_t dnodeInitSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); tscEmbedded = 1; + taosBlockSIGPIPE(); taosResolveCRC(); taosInitGlobalCfg(); taosReadGlobalLogCfg(); diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index f5c28c9573ddaee4039114313fad44f5700ea22b..2f3008c33e52b2c2265fd80d786e1312705468d7 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -20,6 +20,7 @@ #include "trpc.h" #include "mnode.h" #include "http.h" +#include "mqtt.h" #include "monitor.h" #include "dnodeInt.h" #include "dnodeModule.h" @@ -62,6 +63,16 @@ static void dnodeAllocModules() { dnodeSetModuleStatus(TSDB_MOD_HTTP); } + tsModule[TSDB_MOD_MQTT].enable = (tsEnableMqttModule == 1); + tsModule[TSDB_MOD_MQTT].name = "mqtt"; + tsModule[TSDB_MOD_MQTT].initFp = mqttInitSystem; + tsModule[TSDB_MOD_MQTT].cleanUpFp = mqttCleanUpSystem; + tsModule[TSDB_MOD_MQTT].startFp = mqttStartSystem; + tsModule[TSDB_MOD_MQTT].stopFp = mqttStopSystem; + if (tsEnableMqttModule) { + dnodeSetModuleStatus(TSDB_MOD_MQTT); + } + tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1); tsModule[TSDB_MOD_MONITOR].name = "monitor"; tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem; diff --git a/src/inc/mqtt.h b/src/inc/mqtt.h new file mode 100644 index 0000000000000000000000000000000000000000..710737e79a320989ad1ebf0a669246e8352a4f1d --- /dev/null +++ b/src/inc/mqtt.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_MQTT_H +#define TDENGINE_MQTT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +int32_t mqttGetReqCount(); +int32_t mqttInitSystem(); +int32_t mqttStartSystem(); +void mqttStopSystem(); +void mqttCleanUpSystem(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 74de811c59301e1edf39639dc453789c06f85223..cbb83d1028f0b349bc6004b0dd3636fef44ba3f2 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -375,6 +375,7 @@ typedef enum { TSDB_MOD_MGMT, TSDB_MOD_HTTP, TSDB_MOD_MONITOR, + TSDB_MOD_MQTT, TSDB_MOD_MAX } EModuleType; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 05d1d93cf60f7f6d60e2361d2f55746d4a17c007..fcf26d22c39d5bb6f2742677196409456bc531b7 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -94,8 +94,8 @@ typedef void* tsync_h; tsync_h syncStart(const SSyncInfo *); void syncStop(tsync_h shandle); -int syncReconfig(tsync_h shandle, const SSyncCfg *); -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); +int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); +int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncRecover(tsync_h shandle); // recover from other nodes: int syncGetNodesRole(tsync_h shandle, SNodesRole *); diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 3cacafa50501cfd0cf677c92327df499bff7a220..04194c6127f2b825735dcc7cf102c0a321f3079b 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -362,6 +362,26 @@ int main(int argc, char *argv[]) { time_t tTime = time(NULL); struct tm tm = *localtime(&tTime); + printf("###################################################################\n"); + printf("# Server IP: %s:%hu\n", ip_addr == NULL ? "localhost" : ip_addr, port); + printf("# User: %s\n", user); + printf("# Password: %s\n", pass); + printf("# Use metric: %s\n", use_metric ? "true" : "false"); + printf("# Datatype of Columns: %s\n", dataString); + printf("# Binary Length(If applicable): %d\n", + (strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1); + printf("# Number of Columns per record: %d\n", ncols_per_record); + printf("# Number of Connections: %d\n", nconnections); + printf("# Number of Tables: %d\n", ntables); + printf("# Number of Data per Table: %d\n", nrecords_per_table); + printf("# Records/Request: %d\n", nrecords_per_request); + printf("# Database name: %s\n", db_name); + printf("# Table prefix: %s\n", tb_prefix); + printf("# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + printf("###################################################################\n\n"); + printf("Press enter key to continue"); + getchar(); fprintf(fp, "###################################################################\n"); fprintf(fp, "# Server IP: %s:%hu\n", ip_addr == NULL ? "localhost" : ip_addr, port); @@ -858,15 +878,16 @@ void generateData(char *res, char **data_type, int num_of_cols, int64_t timestam pstr += sprintf(pstr, ")"); } +static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJK1234567890"; void rand_string(char *str, int size) { - memset(str, 0, size); - const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJK1234567890"; - char *sptr = str; - if (size) { + str[0] = 0; + if (size > 0) { --size; - for (size_t n = 0; n < size; n++) { + int n; + for (n = 0; n < size; n++) { int key = rand() % (int)(sizeof charset - 1); - sptr += sprintf(sptr, "%c", charset[key]); + str[n] = charset[key]; } + str[n] = 0; } } diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 5ea75a1cde7d1d918b377b26ca4a8fa77872c803..9886a91f48242191a6259a9d15bbc1f3cefc0a81 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -533,7 +533,7 @@ int taosDumpOut(SDumpArguments *arguments) { } } - taos_free_result(result); + // taos_free_result(result); if (count == 0) { fprintf(stderr, "No databases valid to dump\n"); @@ -722,6 +722,57 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols count_temp = counter; for (; counter < numOfCols; counter++) { + TAOS_ROW row = NULL; + + sprintf(command, "select %s from %s limit 1", tableDes->cols[counter].field, tableDes->name); + if (taos_query(taos, command) != 0) { + fprintf(stderr, "failed to run command %s\n", command); + return; + } + + result = taos_use_result(taos); + if (result == NULL) { + fprintf(stderr, "failed to use result\n"); + return; + } + + TAOS_FIELD *fields = taos_fetch_fields(result); + + row = taos_fetch_row(result); + switch (fields[0].type) { + case TSDB_DATA_TYPE_BOOL: + sprintf(tableDes->cols[counter].note, "%d", ((((int)(*((char *)row[0]))) == 1) ? 1 : 0)); + break; + case TSDB_DATA_TYPE_TINYINT: + sprintf(tableDes->cols[counter].note, "%d", (int)(*((char *)row[0]))); + break; + case TSDB_DATA_TYPE_SMALLINT: + sprintf(tableDes->cols[counter].note, "%d", (int)(*((short *)row[0]))); + break; + case TSDB_DATA_TYPE_INT: + sprintf(tableDes->cols[counter].note, "%d", *((int *)row[0])); + break; + case TSDB_DATA_TYPE_BIGINT: + sprintf(tableDes->cols[counter].note, "%" PRId64 "", *((int64_t *)row[0])); + break; + case TSDB_DATA_TYPE_FLOAT: + sprintf(tableDes->cols[counter].note, "%f", GET_FLOAT_VAL(row[0])); + break; + case TSDB_DATA_TYPE_DOUBLE: + sprintf(tableDes->cols[counter].note, "%f", GET_DOUBLE_VAL(row[0])); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + sprintf(tableDes->cols[counter].note, "%" PRId64 "", *(int64_t *)row[0]); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + default: + strncpy(tableDes->cols[counter].note, (char *)row[0], fields[0].bytes); + break; + } + + taos_free_result(result); + if (counter != count_temp) { if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index 58d16ce1b38c082d212203d6291ad37260c12bfc..9d3e46205d1ed21d673db72997164274371b3c64 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -63,7 +63,6 @@ typedef struct SMnodeObj { int8_t updateEnd[1]; int32_t refCount; int8_t role; - SDnodeObj *pDnode; } SMnodeObj; typedef struct { diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 3ba7042c4091c265c8ff703ef658875525c98b73..6471b7f182b41c9e077230671f722f8e37b1985d 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -65,7 +65,6 @@ static int32_t mgmtMnodeActionInsert(SSdbOper *pOper) { SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST; - pMnode->pDnode = pDnode; pDnode->isMgmt = true; mgmtDecDnodeRef(pDnode); @@ -210,6 +209,9 @@ void mgmtUpdateMnodeIpSet() { mgmtMnodeWrLock(); + memset(ipSet, 0, sizeof(tsMnodeRpcIpSet)); + memset(mnodes, 0, sizeof(SDMMnodeInfos)); + int32_t index = 0; void * pIter = NULL; while (1) { @@ -217,22 +219,27 @@ void mgmtUpdateMnodeIpSet() { pIter = mgmtGetNextMnode(pIter, &pMnode); if (pMnode == NULL) break; - strcpy(ipSet->fqdn[ipSet->numOfIps], pMnode->pDnode->dnodeFqdn); - ipSet->port[ipSet->numOfIps] = htons(pMnode->pDnode->dnodePort); + SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); + if (pDnode != NULL) { + strcpy(ipSet->fqdn[ipSet->numOfIps], pDnode->dnodeFqdn); + ipSet->port[ipSet->numOfIps] = htons(pDnode->dnodePort); - mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId); - strcpy(mnodes->nodeInfos[index].nodeEp, pMnode->pDnode->dnodeEp); + mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId); + strcpy(mnodes->nodeInfos[index].nodeEp, pDnode->dnodeEp); - if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { - ipSet->inUse = ipSet->numOfIps; - mnodes->inUse = index; - } + if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { + ipSet->inUse = ipSet->numOfIps; + mnodes->inUse = index; + } - mPrint("mnode:%d, ep:%s %s", index, pMnode->pDnode->dnodeEp, pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : ""); + mPrint("mnode:%d, ep:%s %s", index, pDnode->dnodeEp, + pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : ""); - ipSet->numOfIps++; - index++; - + ipSet->numOfIps++; + index++; + } + + mgmtDecDnodeRef(pDnode); mgmtDecMnodeRef(pMnode); } @@ -382,7 +389,15 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pMnode->pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE); + + SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); + if (pDnode != NULL) { + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE); + } else { + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols] - VARSTR_HEADER_SIZE); + } + mgmtDecDnodeRef(pDnode); + cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 3d4e6fcab10e70d07f8e5bfd8aac6af593fca2fc..237d2ca499296d00ffd50007430a57649767fa9f 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -28,6 +28,7 @@ #include "mgmtDef.h" #include "mgmtInt.h" #include "mgmtMnode.h" +#include "mgmtDnode.h" #include "mgmtSdb.h" typedef enum { @@ -259,10 +260,15 @@ void sdbUpdateSync() { if (pMnode == NULL) break; syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId; - syncCfg.nodeInfo[index].nodePort = pMnode->pDnode->dnodePort + TSDB_PORT_SYNC; - strcpy(syncCfg.nodeInfo[index].nodeFqdn, pMnode->pDnode->dnodeEp); - index++; + SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); + if (pDnode != NULL) { + syncCfg.nodeInfo[index].nodePort = pDnode->dnodePort + TSDB_PORT_SYNC; + strcpy(syncCfg.nodeInfo[index].nodeFqdn, pDnode->dnodeEp); + index++; + } + + mgmtDecDnodeRef(pDnode); mgmtDecMnodeRef(pMnode); } sdbFreeIter(pIter); diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index f0c55449a3608de0140d617658890991592330fe..53fbd64f87e9c50fcb87deb15137c206412f1e35 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1139,7 +1139,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v prefixLen = strlen(prefix); SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; - char stableName[TSDB_TABLE_NAME_LEN] = {0}; + char stableName[TSDB_TABLE_NAME_LEN + 1] = {0}; while (numOfRows < rows) { pShow->pIter = mgmtGetNextSuperTable(pShow->pIter, &pTable); @@ -2024,7 +2024,7 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { SCMMultiTableInfoMsg *pInfo = pMsg->pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); - int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice + int32_t totalMallocLen = 4 * 1024 * 1024; // first malloc 4 MB, subsequent reallocation as twice SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); if (pMultiMeta == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); @@ -2034,26 +2034,30 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { pMultiMeta->contLen = sizeof(SMultiTableMeta); pMultiMeta->numOfTables = 0; - for (int t = 0; t < pInfo->numOfTables; ++t) { - char *tableId = (char*)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN); + for (int32_t t = 0; t < pInfo->numOfTables; ++t) { + char * tableId = (char *)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN + 1); SChildTableObj *pTable = mgmtGetChildTable(tableId); if (pTable == NULL) continue; if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(tableId); - if (pMsg->pDb == NULL) continue; + if (pMsg->pDb == NULL) { + mgmtDecTableRef(pTable); + continue; + } int availLen = totalMallocLen - pMultiMeta->contLen; if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) { - //TODO realloc - //totalMallocLen *= 2; - //pMultiMeta = rpcReMalloc(pMultiMeta, totalMallocLen); - //if (pMultiMeta == NULL) { - /// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - // return TSDB_CODE_SERV_OUT_OF_MEMORY; - //} else { - // t--; - // continue; - //} + totalMallocLen *= 2; + pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen); + if (pMultiMeta == NULL) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); + mgmtDecTableRef(pTable); + return; + } else { + t--; + mgmtDecTableRef(pTable); + continue; + } } STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen); @@ -2062,6 +2066,8 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { pMultiMeta->numOfTables ++; pMultiMeta->contLen += pMeta->contLen; } + + mgmtDecTableRef(pTable); } SRpcMsg rpcRsp = {0}; @@ -2148,7 +2154,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, continue; } - char tableName[TSDB_TABLE_NAME_LEN] = {0}; + char tableName[TSDB_TABLE_NAME_LEN + 1] = {0}; // pattern compare for table name mgmtExtractTableName(pTable->info.tableId, tableName); diff --git a/src/plugins/CMakeLists.txt b/src/plugins/CMakeLists.txt index 562de724105ae934fb804fb7c6f9844ddfbe5e17..2bc6bf54bfe9b330ede401cee521b0931f4be684 100644 --- a/src/plugins/CMakeLists.txt +++ b/src/plugins/CMakeLists.txt @@ -3,3 +3,4 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(monitor) ADD_SUBDIRECTORY(http) +ADD_SUBDIRECTORY(mqtt) diff --git a/src/plugins/mqtt/CMakeLists.txt b/src/plugins/mqtt/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6224a0cad4471bbf15157cf9b9b3b0455440853f --- /dev/null +++ b/src/plugins/mqtt/CMakeLists.txt @@ -0,0 +1,22 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(src SRC) + ADD_LIBRARY(mqtt ${SRC}) + TARGET_LINK_LIBRARIES(mqtt taos_static z) + + IF (TD_ADMIN) + TARGET_LINK_LIBRARIES(mqtt admin) + ENDIF () +ENDIF () diff --git a/src/plugins/mqtt/inc/mqttLog.h b/src/plugins/mqtt/inc/mqttLog.h new file mode 100644 index 0000000000000000000000000000000000000000..735678a326c0842c1c3084ad203255991a0e3615 --- /dev/null +++ b/src/plugins/mqtt/inc/mqttLog.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_MQTT_LOG_H +#define TDENGINE_MQTT_LOG_H + +#include "tlog.h" + +extern int32_t mqttDebugFlag; + +#define mqttError(...) \ + if (mqttDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("ERROR MQT ", 255, __VA_ARGS__); \ + } +#define mqttWarn(...) \ + if ( mqttDebugFlag & DEBUG_WARN) { \ + taosPrintLog("WARN MQT ", mqttDebugFlag, __VA_ARGS__); \ + } +#define mqttTrace(...) \ + if ( mqttDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); \ + } +#define mqttDump(...) \ + if ( mqttDebugFlag & DEBUG_TRACE) { \ + taosPrintLongString("MQT ", mqttDebugFlag, __VA_ARGS__); \ + } +#define mqttPrint(...) \ + { taosPrintLog("MQT ", 255, __VA_ARGS__); } + +#endif diff --git a/src/plugins/mqtt/inc/mqttSystem.h b/src/plugins/mqtt/inc/mqttSystem.h new file mode 100644 index 0000000000000000000000000000000000000000..c61318806d0adc65ce19eb42d36e42a53c015918 --- /dev/null +++ b/src/plugins/mqtt/inc/mqttSystem.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_MQTT_SYSTEM_H +#define TDENGINE_MQTT_SYSTEM_H +#ifdef __cplusplus +extern "C" { +#endif + +#include + +int32_t mqttGetReqCount(); +int32_t mqttInitSystem(); +int32_t mqttStartSystem(); +void mqttStopSystem(); +void mqttCleanUpSystem(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c new file mode 100644 index 0000000000000000000000000000000000000000..ccf6cfb3e34844eea95c48f774f9202794f6a3fa --- /dev/null +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mqttSystem.h" +#include "mqtt.h" +#include "mqttLog.h" +#include "os.h" +#include "taos.h" +#include "tglobal.h" +#include "tsocket.h" +#include "ttimer.h" + +int32_t mqttGetReqCount() { return 0; } +int mqttInitSystem() { + mqttPrint("mqttInitSystem"); + return 0; +} + +int mqttStartSystem() { + mqttPrint("mqttStartSystem"); + return 0; +} + +void mqttStopSystem() { + mqttPrint("mqttStopSystem"); +} + +void mqttCleanUpSystem() { + mqttPrint("mqttCleanUpSystem"); +} diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 0410281f0dee3c7f685417b08620aa8d2c9aec49..a8bb2fd65b2091e33eb26d3f738cbfe4f736f69e 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -867,9 +867,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { // underlying UDP layer does not know it is server or client pRecv->connType = pRecv->connType | pRpc->connType; - if (pRecv->ip==0 && pConn) { - rpcProcessBrokenLink(pConn); - rpcFreeMsg(pRecv->msg); + if (pRecv->ip == 0 && pConn) { + rpcProcessBrokenLink(pConn); return NULL; } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index a7e00027fb23aeefeb576295d06ab8d3fe62eeaa..a7312fadf1139541a41fa24c3809650b600ece18 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -215,7 +215,6 @@ static void* taosAcceptTcpConnection(void *arg) { continue; } - tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port); taosKeepTcpAlive(connFd); // pick up the thread to handle this connection @@ -229,7 +228,8 @@ static void* taosAcceptTcpConnection(void *arg) { inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds); } else { close(connFd); - tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno)); + tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), + inet_ntoa(caddr.sin_addr), caddr.sin_port); } // pick up next thread for next connection @@ -341,7 +341,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { recvInfo.chandle = NULL; recvInfo.connType = RPC_CONN_TCP; (*(pThreadObj->processData))(&recvInfo); - } + } else { + taosFreeFdObj(pFdObj); + } } #define maxEvents 10 @@ -352,7 +354,7 @@ static void *taosProcessTcpData(void *param) { struct epoll_event events[maxEvents]; SRecvInfo recvInfo; SRpcHead rpcHead; - + while (1) { int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); if (pThreadObj->stop) { @@ -466,7 +468,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pFdObj->signature = NULL; epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); - close(pFdObj->fd); + taosCloseSocket(pFdObj->fd); pThreadObj->numOfFds--; diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 958d099027f2072b82aee45fe302f0042c1fd8aa..508f04fbc349ec5485360e70992c6e35b1f8728c 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -127,6 +127,8 @@ int main(int argc, char *argv[]) { SRpcInit rpcInit; char dataName[20] = "server.data"; + taosBlockSIGPIPE(); + memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 7000; rpcInit.label = "SER"; diff --git a/src/util/inc/tsocket.h b/src/util/inc/tsocket.h index 309aa80ef6c0ae4e04d20e72ebb0d91835bcb66d..97abc16333ae08cec2387045d8ae458de43b3a06 100644 --- a/src/util/inc/tsocket.h +++ b/src/util/inc/tsocket.h @@ -31,7 +31,6 @@ int taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); int taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); int taosOpenTcpServerSocket(uint32_t ip, uint16_t port); int taosKeepTcpAlive(int sockFd); -void taosCloseTcpSocket(int sockFd); int taosGetFqdn(char *); uint32_t taosGetIpFromFqdn(const char *); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 1e248c9e459c8615fc61fc8368a794bd3a9ad3a1..475941dbdb9fb9dee3d656b841fc286cfd9043ef 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -65,6 +65,7 @@ taos_queue taosOpenQueue() { } void taosCloseQueue(taos_queue param) { + if (param == NULL) return; STaosQueue *queue = (STaosQueue *)param; STaosQnode *pTemp; STaosQnode *pNode = queue->head; @@ -224,6 +225,7 @@ taos_qset taosOpenQset() { } void taosCloseQset(taos_qset param) { + if (param == NULL) return; STaosQset *qset = (STaosQset *)param; pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem); diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index efdf7529608b272230f804437d71377f1fc6feea..d92228a089785d3c327496a70fd72dd4bc2a7b37 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -305,18 +305,9 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI sockFd = -1; } - return sockFd; -} - -void taosCloseTcpSocket(int sockFd) { - struct linger linger; - linger.l_onoff = 1; - linger.l_linger = 0; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) { - uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno)); - } + // taosKeepTcpAlive(sockFd); - taosCloseSocket(sockFd); + return sockFd; } int taosKeepTcpAlive(int sockFd) { @@ -355,6 +346,15 @@ int taosKeepTcpAlive(int sockFd) { return -1; } + struct linger linger = {0}; + linger.l_onoff = 1; + //linger.l_linger = 0; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) { + uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno)); + close(sockFd); + return -1; + } + return 0; } diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 42fd13b2cd780dc650cae0fbdfe0cba0c4d72afc..e6ef73ef57d3da8a8961c9673096e7f6e6a33e1a 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -405,19 +405,19 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) { tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); return reusable; } - + if (state != TIMER_STATE_EXPIRED) { // timer already stopped or cancelled, has nothing to do in this case return false; } - + if (timer->executedBy == taosGetPthreadId()) { // taosTmrReset is called in the timer callback, should do nothing in this // case to avoid dead lock. note taosTmrReset must be the last statement // of the callback funtion, will be a bug otherwise. return false; } - + // timer callback is executing in another thread, we SHOULD wait it to stop, // BUT this may result in dead lock if current thread are holding a lock which // the timer callback need to acquire. so, we HAVE TO return directly. @@ -501,6 +501,7 @@ static void taosTmrModuleInit(void) { tmr_ctrl_t* ctrl = tmrCtrls + i; ctrl->next = ctrl + 1; } + (tmrCtrls + taosMaxTmrCtrl - 1)->next = NULL; unusedTmrCtrl = tmrCtrls; pthread_mutex_init(&tmrCtrlMutex, NULL); @@ -574,12 +575,12 @@ void taosTmrCleanUp(void* handle) { if (numOfTmrCtrl <=0) { taosUninitTimer(); - + taosCleanUpScheduler(tmrQhandle); for (int i = 0; i < tListLen(wheels); i++) { time_wheel_t* wheel = wheels + i; - pthread_mutex_destroy(&wheel->mutex); + pthread_mutex_destroy(&wheel->mutex); free(wheel->slots); } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 6f0b19b0c62661826fe10905931a9904d950cf1b..b8bc29550e3ee810b850b12922debbe06dfbe104 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -35,7 +35,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); -static bool vnodeReadVersion(SVnodeObj *pVnode); +static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); @@ -46,9 +46,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } +int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } void syncStop(tsync_h shandle) {} -int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } +int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} #endif @@ -148,35 +148,20 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { pVnode->status = TAOS_VN_STATUS_UPDATING; int32_t code = vnodeSaveCfg(pVnodeCfg); - if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; code = vnodeReadCfg(pVnode); - if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, failed to read cfg file", pVnode->vgId); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; code = syncReconfig(pVnode->sync, &pVnode->syncCfg); - if (code != TSDB_CODE_SUCCESS) { - vTrace("vgId:%d, failed to alter vnode, canot reconfig sync, result:%s", pVnode->vgId, - tstrerror(code)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); - if (code != TSDB_CODE_SUCCESS) { - vTrace("vgId:%d, failed to alter vnode, canot reconfig tsdb, result:%s", pVnode->vgId, - tstrerror(code)); - return code; - } + if (code != TSDB_CODE_SUCCESS) return code; pVnode->status = TAOS_VN_STATUS_READY; - vTrace("vgId:%d, vnode is altered", pVnode->vgId); + return TSDB_CODE_SUCCESS; } @@ -185,26 +170,40 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); + if (pVnode == NULL) { + vError("vgId:%d, failed to open vnode since no enough memory", vnode); + return TAOS_SYSTEM_ERROR(errno); + } + + atomic_add_fetch_32(&tsOpennedVnodes, 1); + atomic_add_fetch_32(&pVnode->refCount, 1); + pVnode->vgId = vnode; pVnode->status = TAOS_VN_STATUS_INIT; - pVnode->refCount = 1; pVnode->version = 0; pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); - taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, failed to read cfg file", pVnode->vgId); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + vnodeCleanUp(pVnode); + return code; + } + + code = vnodeReadVersion(pVnode); + if (code != TSDB_CODE_SUCCESS) { + vnodeCleanUp(pVnode); return code; } - vnodeReadVersion(pVnode); pVnode->fversion = pVnode->version; pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "root"); @@ -212,22 +211,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { cqCfg.vgId = vnode; cqCfg.cqWrite = vnodeWriteToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); + if (pVnode->cq == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; - sprintf(temp, "%s/tsdb", rootDir); pVnode->tsdb = tsdbOpenRepo(temp, &appH); if (pVnode->tsdb == NULL) { - vError("vgId:%d, failed to open tsdb at %s(%s)", pVnode->vgId, temp, tstrerror(terrno)); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + vnodeCleanUp(pVnode); return terrno; } sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + if (pVnode->wal == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); SSyncInfo syncInfo; @@ -246,6 +252,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { #ifndef _SYNC pVnode->role = TAOS_SYNC_ROLE_MASTER; +#else + if (pVnode->sync == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } #endif // start continuous query @@ -253,11 +264,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { cqStart(pVnode->cq); pVnode->events = NULL; - pVnode->status = TAOS_VN_STATUS_READY; vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); - atomic_add_fetch_32(&tsOpennedVnodes, 1); + taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); + return TSDB_CODE_SUCCESS; } @@ -286,13 +297,6 @@ void vnodeRelease(void *pVnodeRaw) { } tfree(pVnode->rootDir); - // remove read queue - dnodeFreeRqueue(pVnode->rqueue); - pVnode->rqueue = NULL; - - // remove write queue - dnodeFreeWqueue(pVnode->wqueue); - pVnode->wqueue = NULL; if (pVnode->status == TAOS_VN_STATUS_DELETING) { char rootDir[TSDB_FILENAME_LEN] = {0}; @@ -387,15 +391,26 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } - cqClose(pVnode->cq); - pVnode->cq = NULL; + if (pVnode->wal) + walClose(pVnode->wal); + pVnode->wal = NULL; - tsdbCloseRepo(pVnode->tsdb, 1); + if (pVnode->tsdb) + tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; - walClose(pVnode->wal); - pVnode->wal = NULL; + if (pVnode->cq) + cqClose(pVnode->cq); + pVnode->cq = NULL; + + if (pVnode->wqueue) + dnodeFreeWqueue(pVnode->wqueue); + pVnode->wqueue = NULL; + if (pVnode->rqueue) + dnodeFreeRqueue(pVnode->rqueue); + pVnode->rqueue = NULL; + vnodeRelease(pVnode); } @@ -462,7 +477,8 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { if (!fp) { vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile, strerror(errno)); - return errno; + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; } int32_t len = 0; @@ -512,27 +528,30 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { } static int32_t vnodeReadCfg(SVnodeObj *pVnode) { - char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + cJSON *root = NULL; + char *content = NULL; + char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + int maxLen = 1000; + + terrno = TSDB_CODE_OTHERS; sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(cfgFile, "r"); if (!fp) { - vError("vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode->vgId, + vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId, cfgFile, strerror(errno)); - return errno; + terrno = TAOS_SYSTEM_ERROR(errno); + goto PARSE_OVER; } - int ret = TSDB_CODE_OTHERS; - int maxLen = 1000; - char *content = calloc(1, maxLen + 1); + content = calloc(1, maxLen + 1); + if (content == NULL) goto PARSE_OVER; int len = fread(content, 1, maxLen, fp); if (len <= 0) { - free(content); - fclose(fp); vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId); - return false; + return errno; } - cJSON *root = cJSON_Parse(content); + root = cJSON_Parse(content); if (root == NULL) { vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId); goto PARSE_OVER; @@ -691,19 +710,19 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; } - ret = 0; + terrno = TSDB_CODE_SUCCESS; - vPrint("vgId:%d, read vnode cfg successed, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); + vPrint("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId, pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort); } PARSE_OVER: - free(content); + tfree(content); cJSON_Delete(root); - fclose(fp); - return ret; + if (fp) fclose(fp); + return terrno; } static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { @@ -713,7 +732,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { if (!fp) { vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId, versionFile, strerror(errno)); - return errno; + return TAOS_SYSTEM_ERROR(errno); } int32_t len = 0; @@ -733,29 +752,33 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { return 0; } -static bool vnodeReadVersion(SVnodeObj *pVnode) { - char versionFile[TSDB_FILENAME_LEN + 30] = {0}; +static int32_t vnodeReadVersion(SVnodeObj *pVnode) { + char versionFile[TSDB_FILENAME_LEN + 30] = {0}; + char *content = NULL; + cJSON *root = NULL; + int maxLen = 100; + + terrno = TSDB_CODE_OTHERS; sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(versionFile, "r"); if (!fp) { if (errno != ENOENT) { vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + } else { + terrno = TSDB_CODE_SUCCESS; } - return false; + goto PARSE_OVER; } - bool ret = false; - int maxLen = 100; - char *content = calloc(1, maxLen + 1); + content = calloc(1, maxLen + 1); int len = fread(content, 1, maxLen, fp); if (len <= 0) { - free(content); - fclose(fp); - vPrint("vgId:%d, failed to read vnode version, content is null", pVnode->vgId); - return false; + vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId); + goto PARSE_OVER; } - cJSON *root = cJSON_Parse(content); + root = cJSON_Parse(content); if (root == NULL) { vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId); goto PARSE_OVER; @@ -768,13 +791,12 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) { } pVnode->version = version->valueint; - ret = true; - - vPrint("vgId:%d, read vnode version succeed, version:%" PRId64, pVnode->vgId, pVnode->version); + terrno = TSDB_CODE_SUCCESS; + vPrint("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version); PARSE_OVER: - free(content); + tfree(content); cJSON_Delete(root); - fclose(fp); - return ret; + if(fp) fclose(fp); + return terrno; } diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index 8d92fac926b2d54ecb7a022376d7ffc441ba454a..ebfc9d98bb159c0eb5929d9326be7b579db4c875 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -25,6 +25,7 @@ #include "tlog.h" #include "tchecksum.h" #include "tutil.h" +#include "taoserror.h" #include "twal.h" #include "tqueue.h" @@ -56,7 +57,10 @@ static int walRemoveWalFiles(const char *path); void *walOpen(const char *path, const SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); - if (pWal == NULL) return NULL; + if (pWal == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } pWal->fd = -1; pWal->max = pCfg->wals; @@ -75,6 +79,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { walRenew(pWal); if (pWal->fd <0) { + terrno = TAOS_SYSTEM_ERROR(errno); wError("wal:%s, failed to open", path); pthread_mutex_destroy(&pWal->mutex); free(pWal); @@ -112,9 +117,10 @@ void walClose(void *handle) { } int walRenew(void *handle) { + if (handle == NULL) return 0; SWal *pWal = handle; int code = 0; - + pthread_mutex_lock(&pWal->mutex); if (pWal->fd >=0) { @@ -156,6 +162,7 @@ int walRenew(void *handle) { int walWrite(void *handle, SWalHead *pHead) { SWal *pWal = handle; int code = 0; + if (pWal == NULL) return -1; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; @@ -178,6 +185,7 @@ int walWrite(void *handle, SWalHead *pHead) { void walFsync(void *handle) { SWal *pWal = handle; + if (pWal == NULL) return; if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) { if (fsync(pWal->fd) < 0) { diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 65820db6e1a306a76949a88da42a25280ade9cd8..999332d33bdc838ad77b9276b022ee27b8f47d50 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -13,6 +13,7 @@ python3 ./test.py -f insert/nchar.py python3 ./test.py -f insert/nchar-boundary.py python3 ./test.py -f insert/nchar-unicode.py python3 ./test.py -f insert/multi.py +python3 ./test.py -f insert/randomNullCommit.py python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_num.py diff --git a/tests/pytest/insert/randomNullCommit.py b/tests/pytest/insert/randomNullCommit.py new file mode 100644 index 0000000000000000000000000000000000000000..f90bb9b3e5e985aa1777899c6e53b261072193aa --- /dev/null +++ b/tests/pytest/insert/randomNullCommit.py @@ -0,0 +1,64 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import random + +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + tdLog.info("=============== step1") + tdSql.execute('create table tb (ts timestamp, speed int, temp float, note binary(5), flag bool)') + + numOfRecords = 0 + randomList = [10, 50, 100, 500, 1000, 5000] + for i in range(0, 10): + num = random.choice(randomList) + tdLog.info("will insert %d records" % num) + for x in range(0, num): + tdLog.info( + 'insert into tb values (now + %da, NULL, NULL, NULL, TRUE)' % x) + tdSql.execute( + 'insert into tb values (now + %da, NULL, NULL, NULL, TRUE)' % x) + + numOfRecords = numOfRecords + num + + tdSql.query("select * from tb") + tdSql.checkRows(numOfRecords) + tdSql.checkData(numOfRecords-num, 1, None) + tdSql.checkData(numOfRecords-1, 2, None) + + tdLog.info("stop dnode to commit data to disk") + tdDnodes.stop(1) + tdDnodes.start(1) + tdLog.sleep(5) + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/table/boundary.py b/tests/pytest/table/boundary.py index b68671c61a8c3e8f36372a3692732cbaec0635a7..29fdd5c475b39a375071ab1d24b42ef098e901f3 100644 --- a/tests/pytest/table/boundary.py +++ b/tests/pytest/table/boundary.py @@ -10,7 +10,7 @@ from util.sql import * class TDTestCase: - def init(self, conn): + def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor(), logSql) @@ -95,18 +95,43 @@ class TDTestCase: maxTableNameLen = self.getLimitFromSourceCode('TSDB_TABLE_NAME_LEN') tdLog.notice("table name max length is %d" % maxTableNameLen) - name = self.generateString(maxTableNameLen - 1) - tdLog.info("table name is '%s'" % name) + # create a super table with name exceed max length + sname = self.generateString(maxTableNameLen + 1) + tdLog.info("create a super table with length %d" % len(sname)) + tdSql.error("create table %s (ts timestamp, value int) tags(id int)" % sname) - tdSql.execute("create table %s (ts timestamp, value int)" % name) - tdSql.execute("insert into %s values(now, 0)" % name) + # create a super table with name of max length + sname = self.generateString(maxTableNameLen) + tdLog.info("create a super table with length %d" % len(sname)) + tdSql.execute("create table %s (ts timestamp, value int) tags(id int)" % sname) + tdLog.info("check table count, should be one") + tdSql.query('show stables') + tdSql.checkRows(1) + + # create a child table with name exceed max length + name = self.generateString(maxTableNameLen + 1) + tdLog.info("create a child table with length %d" % len(name)) + tdSql.error("create table %s using %s tags(0)" % (name, sname)) + # create a child table with name of max length + name = self.generateString(maxTableNameLen) + tdLog.info("create a child table with length %d" % len(name)) + tdSql.execute("create table %s using %s tags(0)" % (name, sname)) tdSql.query('show tables') tdSql.checkRows(1) - tdSql.query('select * from %s' % name) + # insert one row + tdLog.info("insert one row of data") + tdSql.execute("insert into %s values(now, 0)" % name) + tdSql.query("select * from " + name) + tdSql.checkRows(1) + tdSql.query("select * from " + sname) tdSql.checkRows(1) + name = name[:len(name) - 1] + tdSql.error("select * from " + name) + tdSql.checkRows(0) + def checkRowBoundaries(self): tdLog.debug("checking row boundaries") tdSql.prepare() diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 245e4b0945181762d0dc7994d3bcf0d2d46b1fd4..ec7ac117c07ee6c34ea91ffbb148729ab4c55119 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -58,6 +58,9 @@ class TDSql: "%s failed: sql:%s, expect error not occured" % (callerFilename, sql)) else: + self.queryRows = 0 + self.queryCols = 0 + self.queryResult = None tdLog.info("sql:%s, expect error occured" % (sql)) def query(self, sql): diff --git a/tests/script/jenkins/sync.txt b/tests/script/jenkins/sync.txt index 15685f99e9c3fea4e754d6d28593dec41c388c1b..934680b7c0c42974a40025afd6f0226e1a6ccf9c 100644 --- a/tests/script/jenkins/sync.txt +++ b/tests/script/jenkins/sync.txt @@ -3,15 +3,64 @@ cd ../../debug; make cd ../../../debug; cmake .. cd ../../../debug; make +./test.sh -u -f unique/account/account_create.sim +./test.sh -u -f unique/account/account_delete.sim +./test.sh -u -f unique/account/account_len.sim +./test.sh -u -f unique/account/authority.sim +./test.sh -u -f unique/account/basic.sim +./test.sh -u -f unique/account/paras.sim +./test.sh -u -f unique/account/pass_alter.sim +./test.sh -u -f unique/account/pass_len.sim +./test.sh -u -f unique/account/usage.sim +./test.sh -u -f unique/account/user_create.sim +./test.sh -u -f unique/account/user_len.sim + +./test.sh -u -f unique/big/balance.sim +./test.sh -u -f unique/big/maxvnodes.sim +./test.sh -u -f unique/big/tcp.sim + ./test.sh -u -f unique/cluster/balance1.sim ./test.sh -u -f unique/cluster/balance2.sim ./test.sh -u -f unique/cluster/balance3.sim ./test.sh -u -f unique/cluster/cache.sim +./test.sh -u -f unique/column/replica3.sim + +./test.sh -u -f unique/db/commit.sim +./test.sh -u -f unique/db/delete.sim +./test.sh -u -f unique/db/delete_part.sim +./test.sh -u -f unique/db/replica_add12.sim +./test.sh -u -f unique/db/replica_add13.sim +./test.sh -u -f unique/db/replica_add23.sim +./test.sh -u -f unique/db/replica_reduce21.sim +./test.sh -u -f unique/db/replica_reduce32.sim +./test.sh -u -f unique/db/replica_reduce31.sim +./test.sh -u -f unique/db/replica_part.sim + ./test.sh -u -f unique/dnode/balance1.sim ./test.sh -u -f unique/dnode/balance2.sim ./test.sh -u -f unique/dnode/balance3.sim ./test.sh -u -f unique/dnode/balancex.sim +./test.sh -u -f unique/dnode/offline1.sim +./test.sh -u -f unique/dnode/offline2.sim +./test.sh -u -f unique/dnode/remove1.sim +./test.sh -u -f unique/dnode/remove2.sim +./test.sh -u -f unique/dnode/vnode_clean.sim + +./test.sh -u -f unique/http/admin.sim +./test.sh -u -f unique/http/opentsdb.sim + +./test.sh -u -f unique/import/replica2.sim +./test.sh -u -f unique/import/replica3.sim + +./test.sh -u -f unique/stable/balance_replica1.sim +./test.sh -u -f unique/stable/dnode2_stop.sim +./test.sh -u -f unique/stable/dnode2.sim +./test.sh -u -f unique/stable/dnode3.sim +./test.sh -u -f unique/stable/replica2_dnode4.sim +./test.sh -u -f unique/stable/replica2_vnode3.sim +./test.sh -u -f unique/stable/replica3_dnode6.sim +./test.sh -u -f unique/stable/replica3_vnode3.sim ./test.sh -u -f unique/mnode/mgmt22.sim ./test.sh -u -f unique/mnode/mgmt23.sim @@ -21,3 +70,10 @@ cd ../../../debug; make ./test.sh -u -f unique/mnode/mgmt33.sim ./test.sh -u -f unique/mnode/mgmt34.sim ./test.sh -u -f unique/mnode/mgmtr2.sim + +./test.sh -u -f unique/vnode/many.sim +./test.sh -u -f unique/vnode/replica2_basic2.sim +./test.sh -u -f unique/vnode/replica2_repeat.sim +./test.sh -u -f unique/vnode/replica3_basic.sim +./test.sh -u -f unique/vnode/replica3_repeat.sim +./test.sh -u -f unique/vnode/replica3_vgroup.sim diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index 68a05a33e75e0716d83c21da3aade66a16ca472a..1db643c5c9c198e103aa0429b06cf6b5848ea960 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -23,4 +23,8 @@ system sh/cfg.sh -n dnode4 -c mgmtEqualVnodeNum -v 4 system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 -system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 \ No newline at end of file +system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 + +system sh/cfg.sh -n dnode1 -c http -v 1 +system sh/cfg.sh -n dnode2 -c http -v 1 +system sh/cfg.sh -n dnode3 -c http -v 1 \ No newline at end of file diff --git a/tests/test-all.sh b/tests/test-all.sh index f54d094649bb417d2a3d37edac81a64209543182..a56b5eeef7fcb7c306318a81655c8656a78e4843 100755 --- a/tests/test-all.sh +++ b/tests/test-all.sh @@ -40,7 +40,6 @@ fi totalPySuccess=`grep 'successfully executed' pytest-out.txt | wc -l` if [ "$totalPySuccess" -gt "0" ]; then - grep 'successfully executed' pytest-out.txt echo -e "${GREEN} ### Total $totalPySuccess python case(s) succeed! ### ${NC}" fi