提交 244b8e35 编写于 作者: S Shuaiqiang Chang

Merge branch 'develop' into hotfix/taos-tools

* develop:
  [TD-439]
  fix definite lost while show command failed
  fix cq coredump
  fix resource leak in mnodeShow.c
  [TD-444] fix alter native role error
  fix resource leak in dnodeVWrite.c
  [TD-440] fix resource leak in mnodeTable.c
  change some interface
  fix commit coredump data lost bug
  TD-354
...@@ -427,7 +427,7 @@ static void doInitGlobalConfig() { ...@@ -427,7 +427,7 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// 0-any; 1-mnode; 2-dnode // 0-any; 1-mnode; 2-vnode
cfg.option = "alternativeRole"; cfg.option = "alternativeRole";
cfg.ptr = &tsAlternativeRole; cfg.ptr = &tsAlternativeRole;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -15,17 +15,19 @@ ...@@ -15,17 +15,19 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include <errno.h>
#include <pthread.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <pthread.h>
#include <errno.h> #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcq.h"
#include "tdataformat.h"
#include "tglobal.h" #include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "twal.h" #include "twal.h"
#include "tcq.h"
#include "taos.h"
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }} #define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }} #define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }}
...@@ -46,15 +48,14 @@ typedef struct { ...@@ -46,15 +48,14 @@ typedef struct {
} SCqContext; } SCqContext;
typedef struct SCqObj { typedef struct SCqObj {
int tid; // table ID int tid; // table ID
int rowSize; // bytes of a row int rowSize; // bytes of a row
char *sqlStr; // SQL string char * sqlStr; // SQL string
int columns; // number of columns STSchema * pSchema; // pointer to schema array
SSchema *pSchema; // pointer to schema array void * pStream;
void *pStream; struct SCqObj *prev;
struct SCqObj *prev; struct SCqObj *next;
struct SCqObj *next; SCqContext * pContext;
SCqContext *pContext;
} SCqObj; } SCqObj;
int cqDebugFlag = 135; int cqDebugFlag = 135;
...@@ -152,7 +153,7 @@ void cqStop(void *handle) { ...@@ -152,7 +153,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock(&pContext->mutex); pthread_mutex_unlock(&pContext->mutex);
} }
void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) { void *cqCreate(void *handle, int tid, char *sqlStr, STSchema *pSchema) {
SCqContext *pContext = handle; SCqContext *pContext = handle;
SCqObj *pObj = calloc(sizeof(SCqObj), 1); SCqObj *pObj = calloc(sizeof(SCqObj), 1);
...@@ -162,11 +163,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column ...@@ -162,11 +163,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column
pObj->sqlStr = malloc(strlen(sqlStr)+1); pObj->sqlStr = malloc(strlen(sqlStr)+1);
strcpy(pObj->sqlStr, sqlStr); strcpy(pObj->sqlStr, sqlStr);
pObj->columns = columns; pObj->pSchema = tdDupSchema(pSchema);
int size = sizeof(SSchema) * columns;
pObj->pSchema = malloc(size);
memcpy(pObj->pSchema, pSchema, size);
cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr); cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
......
...@@ -59,21 +59,16 @@ int main(int argc, char *argv[]) { ...@@ -59,21 +59,16 @@ int main(int argc, char *argv[]) {
exit(-1); exit(-1);
} }
SSchema schema[2]; STSchema *pSchema = tdNewSchema(2);
schema[0].type = TSDB_DATA_TYPE_TIMESTAMP; tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
strcpy(schema[0].name, "ts"); tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4);
schema[0].colId = 0;
schema[0].bytes = 8;
schema[1].type = TSDB_DATA_TYPE_INT;
strcpy(schema[1].name, "avgspeed");
schema[1].colId = 1;
schema[1].bytes = 4;
for (int sid =1; sid<10; ++sid) { for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", schema, 2); cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
} }
tdFreeSchema(pSchema);
while (1) { while (1) {
char c = getchar(); char c = getchar();
......
...@@ -369,7 +369,8 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { ...@@ -369,7 +369,8 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
} }
static bool dnodeReadMnodeInfos() { static bool dnodeReadMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0}; char ipFile[TSDB_FILENAME_LEN*2] = {0};
sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir); sprintf(ipFile, "%s/mnodeIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r"); FILE *fp = fopen(ipFile, "r");
if (!fp) { if (!fp) {
...@@ -537,7 +538,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -537,7 +538,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
} }
static bool dnodeReadDnodeCfg() { static bool dnodeReadDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN] = {0}; char dnodeCfgFile[TSDB_FILENAME_LEN*2] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir); sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "r"); FILE *fp = fopen(dnodeCfgFile, "r");
......
...@@ -28,8 +28,12 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -28,8 +28,12 @@ int32_t main(int32_t argc, char *argv[]) {
// Set global configuration file // Set global configuration file
for (int32_t i = 1; i < argc; ++i) { for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
strcpy(configDir, argv[++i]); if (strlen(argv[++i]) > TSDB_FILENAME_LEN - 1) {
printf("config file path overflow");
exit(EXIT_FAILURE);
}
strcpy(configDir, argv[i]);
} else { } else {
printf("'-c' requires a parameter, default:%s\n", configDir); printf("'-c' requires a parameter, default:%s\n", configDir);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
......
...@@ -129,7 +129,10 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { ...@@ -129,7 +129,10 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL; if (pWorker->qset == NULL) {
taosCloseQueue(queue);
return NULL;
}
taosAddIntoQset(pWorker->qset, queue, pVnode); taosAddIntoQset(pWorker->qset, queue, pVnode);
pWorker->qall = taosAllocateQall(); pWorker->qall = taosAllocateQall();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
extern "C" { extern "C" {
#endif #endif
#include "tdataformat.h"
typedef int (*FCqWrite)(void *ahandle, void *pHead, int type); typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
...@@ -40,7 +41,7 @@ void cqStart(void *handle); ...@@ -40,7 +41,7 @@ void cqStart(void *handle);
void cqStop(void *handle); void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ // cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns); void *cqCreate(void *handle, int sid, char *sqlStr, STSchema *pSchema);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle); void cqDrop(void *handle);
......
...@@ -43,6 +43,8 @@ typedef struct { ...@@ -43,6 +43,8 @@ typedef struct {
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status); int (*notifyStatus)(void *, int status);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle);
} STsdbAppH; } STsdbAppH;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION // --------- TSDB REPOSITORY CONFIGURATION DEFINITION
...@@ -71,7 +73,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside ...@@ -71,7 +73,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(TsdbRepoT *repo); int32_t tsdbDropRepo(TsdbRepoT *repo);
TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH); TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit); int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit);
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg);
......
...@@ -96,8 +96,12 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { ...@@ -96,8 +96,12 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
strcpy(configDir, argv[++i]); if (strlen(argv[++i]) > TSDB_FILENAME_LEN - 1) {
fprintf(stderr, "config file path: %s overflow max len %d\n", argv[i], TSDB_FILENAME_LEN - 1);
exit(EXIT_FAILURE);
}
strcpy(configDir, argv[i]);
} else { } else {
fprintf(stderr, "Option -c requires an argument\n"); fprintf(stderr, "Option -c requires an argument\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
......
...@@ -80,6 +80,11 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -80,6 +80,11 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
if (wordexp(arg, &full_path, 0) != 0) { if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg); fprintf(stderr, "Invalid path %s\n", arg);
return -1; return -1;
}
if (strlen(full_path.we_wordv[0]) > TSDB_FILENAME_LEN - 1) {
fprintf(stderr, "config file path: %s overflow max len %d\n", full_path.we_wordv[0], TSDB_FILENAME_LEN - 1);
wordfree(&full_path);
return -1;
} }
strcpy(configDir, full_path.we_wordv[0]); strcpy(configDir, full_path.we_wordv[0]);
wordfree(&full_path); wordfree(&full_path);
......
...@@ -76,8 +76,12 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { ...@@ -76,8 +76,12 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
strcpy(configDir, argv[++i]); if (strlen(argv[++i]) > TSDB_FILENAME_LEN - 1) {
fprintf(stderr, "config file path: %s overflow max len %d\n", argv[i], TSDB_FILENAME_LEN - 1);
exit(EXIT_FAILURE);
}
strcpy(configDir, argv[i]);
} else { } else {
fprintf(stderr, "Option -c requires an argument\n"); fprintf(stderr, "Option -c requires an argument\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
......
...@@ -27,6 +27,12 @@ typedef enum { ...@@ -27,6 +27,12 @@ typedef enum {
TAOS_DN_STATUS_READY TAOS_DN_STATUS_READY
} EDnodeStatus; } EDnodeStatus;
typedef enum {
TAOS_DN_ALTERNATIVE_ROLE_ANY,
TAOS_DN_ALTERNATIVE_ROLE_MNODE,
TAOS_DN_ALTERNATIVE_ROLE_VNODE
} EDnodeAlternativeRole;
int32_t mnodeInitDnodes(); int32_t mnodeInitDnodes();
void mnodeCleanupDnodes(); void mnodeCleanupDnodes();
......
...@@ -58,6 +58,7 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -58,6 +58,7 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole);
static int32_t mnodeDnodeActionDestroy(SSdbOper *pOper) { static int32_t mnodeDnodeActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj); tfree(pOper->pObj);
...@@ -521,6 +522,12 @@ static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -521,6 +522,12 @@ static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "alternativeRole");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time"); strcpy(pSchema[cols].name, "create_time");
...@@ -572,12 +579,16 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -572,12 +579,16 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
*(int16_t *)pWrite = pDnode->totalVnodes; *(int16_t *)pWrite = pDnode->totalVnodes;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* status = mnodeGetDnodeStatusStr(pDnode->status); char* status = mnodeGetDnodeStatusStr(pDnode->status);
STR_TO_VARSTR(pWrite, status); STR_TO_VARSTR(pWrite, status);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* role = mnodeGetDnodeAlternativeRoleStr(pDnode->alternativeRole);
STR_TO_VARSTR(pWrite, role);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pDnode->createdTime; *(int64_t *)pWrite = pDnode->createdTime;
cols++; cols++;
...@@ -895,3 +906,13 @@ char* mnodeGetDnodeStatusStr(int32_t dnodeStatus) { ...@@ -895,3 +906,13 @@ char* mnodeGetDnodeStatusStr(int32_t dnodeStatus) {
default: return "undefined"; default: return "undefined";
} }
} }
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) {
switch (alternativeRole) {
case TAOS_DN_ALTERNATIVE_ROLE_ANY: return "any";
case TAOS_DN_ALTERNATIVE_ROLE_MNODE: return "mnode";
case TAOS_DN_ALTERNATIVE_ROLE_VNODE: return "vnode";
default:return "any";
}
}
...@@ -116,12 +116,6 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { ...@@ -116,12 +116,6 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
} }
int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SCMShowRsp *pShowRsp = rpcMallocCont(size);
if (pShowRsp == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen); int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen);
SShowObj *pShow = (SShowObj *) calloc(1, showObjSize); SShowObj *pShow = (SShowObj *) calloc(1, showObjSize);
pShow->signature = pShow; pShow->signature = pShow;
...@@ -131,7 +125,14 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { ...@@ -131,7 +125,14 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
pShow = mnodeSaveShowObj(pShow, showObjSize); pShow = mnodeSaveShowObj(pShow, showObjSize);
if (pShow == NULL) { if (pShow == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
int32_t size = sizeof(SCMShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SCMShowRsp *pShowRsp = rpcMallocCont(size);
if (pShowRsp == NULL) {
mnodeFreeShowObj(pShow);
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
pShowRsp->qhandle = htobe64((uint64_t) pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow);
...@@ -144,6 +145,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { ...@@ -144,6 +145,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code)); mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mnodeGetShowType(pShowMsg->type), tstrerror(code));
rpcFreeCont(pShowRsp);
mnodeCleanupShowObj(pShow, true); mnodeCleanupShowObj(pShow, true);
return code; return code;
} }
......
...@@ -484,7 +484,10 @@ static int32_t mnodeSuperTableActionDecode(SSdbOper *pOper) { ...@@ -484,7 +484,10 @@ static int32_t mnodeSuperTableActionDecode(SSdbOper *pOper) {
if (pStable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; if (pStable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
int32_t len = strlen(pOper->rowData); int32_t len = strlen(pOper->rowData);
if (len > TSDB_TABLE_ID_LEN) return TSDB_CODE_INVALID_TABLE_ID; if (len > TSDB_TABLE_ID_LEN){
free(pStable);
return TSDB_CODE_INVALID_TABLE_ID;
}
pStable->info.tableId = strdup(pOper->rowData); pStable->info.tableId = strdup(pOper->rowData);
len++; len++;
...@@ -2369,4 +2372,4 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v ...@@ -2369,4 +2372,4 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
return numOfRows; return numOfRows;
} }
\ No newline at end of file
...@@ -281,7 +281,7 @@ int tgReadSchema(char *fileName) { ...@@ -281,7 +281,7 @@ int tgReadSchema(char *fileName) {
} }
void tgInitHandle(HttpServer *pServer) { void tgInitHandle(HttpServer *pServer) {
char fileName[256] = {0}; char fileName[TSDB_FILENAME_LEN*2] = {0};
sprintf(fileName, "%s/taos.telegraf.cfg", configDir); sprintf(fileName, "%s/taos.telegraf.cfg", configDir);
if (tgReadSchema(fileName) <= 0) { if (tgReadSchema(fileName) <= 0) {
tgFreeSchemas(); tgFreeSchemas();
......
...@@ -87,6 +87,7 @@ typedef struct STable { ...@@ -87,6 +87,7 @@ typedef struct STable {
struct STable *prev; struct STable *prev;
tstr * name; // NOTE: there a flexible string here tstr * name; // NOTE: there a flexible string here
char * sql; char * sql;
void * cqhandle;
} STable; } STable;
#define TSDB_GET_TABLE_LAST_KEY(tb) ((tb)->lastKey) #define TSDB_GET_TABLE_LAST_KEY(tb) ((tb)->lastKey)
...@@ -110,6 +111,7 @@ typedef struct { ...@@ -110,6 +111,7 @@ typedef struct {
SMetaFile *mfh; // meta file handle SMetaFile *mfh; // meta file handle
int maxRowBytes; int maxRowBytes;
int maxCols; int maxCols;
void * pRepo;
} STsdbMeta; } STsdbMeta;
// element put in skiplist for each table // element put in skiplist for each table
...@@ -118,7 +120,7 @@ typedef struct STableIndexElem { ...@@ -118,7 +120,7 @@ typedef struct STableIndexElem {
STable* pTable; STable* pTable;
} STableIndexElem; } STableIndexElem;
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables); STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo);
int32_t tsdbFreeMeta(STsdbMeta *pMeta); int32_t tsdbFreeMeta(STsdbMeta *pMeta);
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable); STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
......
...@@ -189,9 +189,9 @@ _err: ...@@ -189,9 +189,9 @@ _err:
* *
* @return a TSDB repository handle on success, NULL for failure and the error number is set * @return a TSDB repository handle on success, NULL for failure and the error number is set
*/ */
TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
char dataDir[128] = "\0"; char dataDir[128] = "\0";
if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { if (access(rootDir, F_OK | W_OK | R_OK) < 0) {
return NULL; return NULL;
} }
...@@ -200,12 +200,12 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -200,12 +200,12 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
return NULL; return NULL;
} }
pRepo->rootDir = strdup(tsdbDir); pRepo->rootDir = strdup(rootDir);
tsdbRestoreCfg(pRepo, &(pRepo->config)); tsdbRestoreCfg(pRepo, &(pRepo->config));
if (pAppH) pRepo->appH = *pAppH; if (pAppH) pRepo->appH = *pAppH;
pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); pRepo->tsdbMeta = tsdbInitMeta(rootDir, pRepo->config.maxTables, pRepo);
if (pRepo->tsdbMeta == NULL) { if (pRepo->tsdbMeta == NULL) {
free(pRepo->rootDir); free(pRepo->rootDir);
free(pRepo); free(pRepo);
......
...@@ -142,6 +142,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { ...@@ -142,6 +142,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
void tsdbOrgMeta(void *pHandle) { void tsdbOrgMeta(void *pHandle) {
STsdbMeta *pMeta = (STsdbMeta *)pHandle; STsdbMeta *pMeta = (STsdbMeta *)pHandle;
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
for (int i = 1; i < pMeta->maxTables; i++) { for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
...@@ -149,13 +150,20 @@ void tsdbOrgMeta(void *pHandle) { ...@@ -149,13 +150,20 @@ void tsdbOrgMeta(void *pHandle) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
} }
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, i, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
}
}
} }
/** /**
* Initialize the meta handle * Initialize the meta handle
* ASSUMPTIONS: VALID PARAMETER * ASSUMPTIONS: VALID PARAMETER
*/ */
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo) {
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) return NULL; if (pMeta == NULL) return NULL;
...@@ -165,6 +173,7 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { ...@@ -165,6 +173,7 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *)); pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
pMeta->maxRowBytes = 0; pMeta->maxRowBytes = 0;
pMeta->maxCols = 0; pMeta->maxCols = 0;
pMeta->pRepo = pRepo;
if (pMeta->tables == NULL) { if (pMeta->tables == NULL) {
free(pMeta); free(pMeta);
return NULL; return NULL;
...@@ -189,13 +198,16 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) { ...@@ -189,13 +198,16 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
} }
int32_t tsdbFreeMeta(STsdbMeta *pMeta) { int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
if (pMeta == NULL) return 0; if (pMeta == NULL) return 0;
tsdbCloseMetaFile(pMeta->mfh); tsdbCloseMetaFile(pMeta->mfh);
for (int i = 1; i < pMeta->maxTables; i++) { for (int i = 1; i < pMeta->maxTables; i++) {
if (pMeta->tables[i] != NULL) { if (pMeta->tables[i] != NULL) {
tsdbFreeTable(pMeta->tables[i]); STable *pTable = pMeta->tables[i];
if (pTable->type == TSDB_STREAM_TABLE) (*pRepo->appH.cqDropFunc)(pTable->cqhandle);
tsdbFreeTable(pTable);
} }
} }
...@@ -512,6 +524,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { ...@@ -512,6 +524,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
} }
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo;
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
// add super table to the linked list // add super table to the linked list
if (pMeta->superList == NULL) { if (pMeta->superList == NULL) {
...@@ -531,7 +544,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -531,7 +544,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
if (pTable->type == TSDB_STREAM_TABLE && addIdx) { if (pTable->type == TSDB_STREAM_TABLE && addIdx) {
// TODO pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable));
} }
pMeta->nTables++; pMeta->nTables++;
......
...@@ -241,7 +241,12 @@ void taosReadGlobalLogCfg() { ...@@ -241,7 +241,12 @@ void taosReadGlobalLogCfg() {
wordexp_t full_path; wordexp_t full_path;
wordexp(configDir, &full_path, 0); wordexp(configDir, &full_path, 0);
if (full_path.we_wordv != NULL && full_path.we_wordv[0] != NULL) { if (full_path.we_wordv != NULL && full_path.we_wordv[0] != NULL) {
if (strlen(full_path.we_wordv[0]) > TSDB_FILENAME_LEN - 1) {
printf("\nconfig file: %s path overflow max len %d, all variables are set to default\n", full_path.we_wordv[0], TSDB_FILENAME_LEN - 1);
wordfree(&full_path);
return;
}
strcpy(configDir, full_path.we_wordv[0]); strcpy(configDir, full_path.we_wordv[0]);
} else { } else {
printf("configDir:%s not there, use default value: /etc/taos", configDir); printf("configDir:%s not there, use default value: /etc/taos", configDir);
......
...@@ -220,6 +220,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -220,6 +220,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus; appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq; appH.cqH = pVnode->cq;
appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop;
sprintf(temp, "%s/tsdb", rootDir); sprintf(temp, "%s/tsdb", rootDir);
pVnode->tsdb = tsdbOpenRepo(temp, &appH); pVnode->tsdb = tsdbOpenRepo(temp, &appH);
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
...@@ -391,14 +393,14 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -391,14 +393,14 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
pVnode->sync = NULL; pVnode->sync = NULL;
} }
if (pVnode->wal)
walClose(pVnode->wal);
pVnode->wal = NULL;
if (pVnode->tsdb) if (pVnode->tsdb)
tsdbCloseRepo(pVnode->tsdb, 1); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
if (pVnode->wal)
walClose(pVnode->wal);
pVnode->wal = NULL;
if (pVnode->cq) if (pVnode->cq)
cqClose(pVnode->cq); cqClose(pVnode->cq);
pVnode->cq = NULL; pVnode->cq = NULL;
...@@ -467,6 +469,8 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { ...@@ -467,6 +469,8 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus; appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq; appH.cqH = pVnode->cq;
appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
} }
......
...@@ -269,6 +269,7 @@ cd ../../../debug; make ...@@ -269,6 +269,7 @@ cd ../../../debug; make
./test.sh -u -f unique/db/replica_reduce31.sim ./test.sh -u -f unique/db/replica_reduce31.sim
./test.sh -u -f unique/db/replica_part.sim ./test.sh -u -f unique/db/replica_part.sim
./test.sh -u -f unique/dnode/alternativeRole.sim
./test.sh -u -f unique/dnode/balance1.sim ./test.sh -u -f unique/dnode/balance1.sim
./test.sh -u -f unique/dnode/balance2.sim ./test.sh -u -f unique/dnode/balance2.sim
./test.sh -u -f unique/dnode/balance3.sim ./test.sh -u -f unique/dnode/balance3.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c alternativeRole -v 1
system sh/cfg.sh -n dnode2 -c alternativeRole -v 2
system sh/cfg.sh -n dnode3 -c alternativeRole -v 0
system sh/cfg.sh -n dnode1 -c wallevel -v 1
system sh/cfg.sh -n dnode2 -c wallevel -v 1
system sh/cfg.sh -n dnode3 -c wallevel -v 1
system sh/cfg.sh -n dnode1 -c numOfMpeers -v 3
system sh/cfg.sh -n dnode2 -c numOfMpeers -v 3
system sh/cfg.sh -n dnode3 -c numOfMpeers -v 3
print ========== step1
system sh/exec_up.sh -n dnode1 -s start
sql connect
sql create dnode $hostname2
system sh/exec_up.sh -n dnode2 -s start
sleep 3000
sql create dnode $hostname3
system sh/exec_up.sh -n dnode3 -s start
sleep 3000
sql show dnodes
print dnode1 $data5_1
print dnode1 $data5_2
print dnode1 $data5_3
if $data5_1 != mnode then
return -1
endi
if $data5_2 != vnode then
return -1
endi
if $data5_3 != any then
return -1
endi
sql show mnodes
print dnode1 ==> $data2_1
print dnode2 ==> $data2_2
print dnode3 ==> $data2_3
if $data2_1 != master then
return -1
endi
if $data2_2 != null then
return -1
endi
if $data2_3 != slave then
return -1
endi
print ========== step2
sql create database d1 maxTables 4
sql create table d1.t1 (ts timestamp, i int)
sql create table d1.t2 (ts timestamp, i int)
sql create table d1.t3 (ts timestamp, i int)
sql create table d1.t4 (ts timestamp, i int)
sql create table d1.t5 (ts timestamp, i int)
sql create table d1.t6 (ts timestamp, i int)
sql create table d1.t7 (ts timestamp, i int)
sql create table d1.t8 (ts timestamp, i int)
sql show dnodes
print dnode1 $data2_1
print dnode2 $data2_2
print dnode3 $data2_3
if $data2_1 != 0 then
return -1
endi
if $data2_2 != 1 then
return -1
endi
if $data2_3 != 1 then
return -1
endi
system sh/exec_up.sh -n dnode1 -s stop -x SIGINT
system sh/exec_up.sh -n dnode2 -s stop -x SIGINT
system sh/exec_up.sh -n dnode3 -s stop -x SIGINT
system sh/exec_up.sh -n dnode4 -s stop -x SIGINT
system sh/exec_up.sh -n dnode5 -s stop -x SIGINT
system sh/exec_up.sh -n dnode6 -s stop -x SIGINT
system sh/exec_up.sh -n dnode7 -s stop -x SIGINT
system sh/exec_up.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册