提交 8eb30f62 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/crash_gen

......@@ -24,3 +24,11 @@ ENDIF ()
IF (TD_MEM_CHECK)
ADD_DEFINITIONS(-DTAOS_MEM_CHECK)
ENDIF ()
IF (TD_RANDOM_FILE_FAIL)
ADD_DEFINITIONS(-DTAOS_RANDOM_FILE_FAIL)
ENDIF ()
IF (TD_RANDOM_NETWORK_FAIL)
ADD_DEFINITIONS(-DTAOS_RANDOM_NETWORK_FAIL)
ENDIF ()
......@@ -30,4 +30,14 @@ ENDIF ()
IF (${MEM_CHECK} MATCHES "true")
SET(TD_MEM_CHECK TRUE)
MESSAGE(STATUS "build with memory check")
ENDIF ()
\ No newline at end of file
ENDIF ()
IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET(TD_RANDOM_FILE_FAIL TRUE)
MESSAGE(STATUS "build with random-file-fail enabled")
ENDIF ()
IF (${RANDOM_NETWORK_FAIL} MATCHES "true")
SET(TD_RANDOM_NETWORK_FAIL TRUE)
MESSAGE(STATUS "build with random-network-fail enabled")
ENDIF ()
......@@ -176,8 +176,6 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
int32_t tscValidateName(SSQLToken* pToken);
void tscIncStreamExecutionCount(void* pStream);
......
......@@ -32,8 +32,8 @@ extern "C" {
#include "qExecutor.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "tcmdtype.h"
// forward declaration
struct SSqlInfo;
......@@ -395,7 +395,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
int doAsyncParseSql(SSqlObj* pSql);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
......@@ -403,13 +402,14 @@ void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
// todo remove this function.
bool tscResultsetFetchCompleted(TAOS_RES *result);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
......
......@@ -213,27 +213,34 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
// handle the sub queries of join query
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
return;
} else {
/*
} else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
return;
} else {
/*
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
/*
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
*/
(*pSql->fetchFp)(param, pSql, 0);
}
return;
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) {
// in case of show command, return no data
(*pSql->fetchFp)(param, pSql, 0);
} else {
assert(0);
}
return;
} else { // current query is not completed, continue retrieve from node
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
......@@ -405,17 +412,6 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) {
taos_free_result(pSql);
}
void tscQueueAsyncFreeResult(SSqlObj *pSql) {
tscDebug("%p sqlObj put in queue to async free", pSql);
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessAsyncFree;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
......
......@@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosCacheEmpty(tscCacheHandle);
taosCacheEmpty(tscCacheHandle,false);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
......
此差异已折叠。
......@@ -14,8 +14,8 @@
*/
#include "os.h"
#include "qsqltype.h"
#include "tcache.h"
#include "tcmdtype.h"
#include "trpc.h"
#include "tscLocalMerge.h"
#include "tscLog.h"
......@@ -46,10 +46,13 @@ static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
SRpcIpSet* pIpList = &pSql->ipList;
pIpList->numOfIps = pVgroupInfo->numOfIps;
pIpList->inUse = 0;
if (pVgroupInfo == NULL) {
pIpList->numOfIps = 0;
return;
}
pIpList->numOfIps = pVgroupInfo->numOfIps;
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
......@@ -539,14 +542,18 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t index = pTableMetaInfo->vgroupIndex;
assert(index >= 0);
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
if (pTableMetaInfo->vgroupList->numOfVgroups > 0) {
pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
}
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups);
} else {
pVgroupInfo = &pTableMeta->vgroupInfo;
}
tscSetDnodeIpList(pSql, pVgroupInfo);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
if (pVgroupInfo != NULL) {
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
}
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->sid);
......@@ -1675,8 +1682,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL);
pTableMetaInfo->pTableMeta =
(STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta, size, tsTableMetaKeepTimer);
pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscCacheHandle, pTableMetaInfo->name,
strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer);
// todo handle out of memory case
if (pTableMetaInfo->pTableMeta == NULL) {
......@@ -1879,7 +1886,8 @@ int tscProcessShowRsp(SSqlObj *pSql) {
size_t size = 0;
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, (char *)pTableMeta, size, tsTableMetaKeepTimer);
pTableMetaInfo->pTableMeta = taosCachePut(tscCacheHandle, key, strlen(key), (char *)pTableMeta, size,
tsTableMetaKeepTimer);
SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
if (pQueryInfo->colList == NULL) {
......@@ -1942,16 +1950,15 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
}
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
taosCacheEmpty(tscCacheHandle);
taosCacheEmpty(tscCacheHandle, false);
return 0;
}
int tscProcessDropTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
if (pTableMeta == NULL) {
/* not in cache, abort */
STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
if (pTableMeta == NULL) { /* not in cache, abort */
return 0;
}
......@@ -1975,7 +1982,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
STableMeta *pTableMeta = taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
if (pTableMeta == NULL) { /* not in cache, abort */
return 0;
}
......@@ -1989,7 +1996,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
if (isSuperTable) { // if it is a super table, reset whole query cache
tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
taosCacheEmpty(tscCacheHandle);
taosCacheEmpty(tscCacheHandle, false);
}
}
......@@ -2125,7 +2132,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), false);
}
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByKey(tscCacheHandle, pTableMetaInfo->name, strlen(pTableMetaInfo->name));
if (pTableMetaInfo->pTableMeta != NULL) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
tscDebug("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
......
......@@ -148,7 +148,7 @@ void taos_init_imp() {
refreshTime = refreshTime < 10 ? 10 : refreshTime;
if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(refreshTime);
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "client");
}
tscDebug("client is initialized successfully");
......
......@@ -1115,31 +1115,6 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
return taosArrayGetP(pColumnList, i);
}
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
return pFilter;
}
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) {
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QSQLCMD_H
#define TDENGINE_QSQLCMD_H
#ifndef TDENGINE_TSQLMSGTYPE_H
#define TDENGINE_TSQLMSGTYPE_H
#ifdef __cplusplus
extern "C" {
......@@ -109,4 +109,4 @@ extern char *sqlCmd[];
}
#endif
#endif // TDENGINE_QSQLCMD_H
#endif // TDENGINE_TSQLMSGTYPE_H
......@@ -50,8 +50,8 @@ extern "C" {
typedef struct {
int8_t type; // Column type
int16_t colId; // column ID
int32_t bytes; // column bytes
int32_t offset; // point offset in SDataRow after the header part
int16_t bytes; // column bytes
int16_t offset; // point offset in SDataRow after the header part
} STColumn;
#define colType(col) ((col)->type)
......@@ -116,7 +116,7 @@ typedef struct {
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Data row structure
......
......@@ -33,6 +33,7 @@ extern int32_t tsStatusInterval;
extern int16_t tsNumOfVnodesPerCore;
extern int16_t tsNumOfTotalVnodes;
extern int32_t tsNumOfMnodes;
extern int32_t tsEnableVnodeBak;
// common
extern int tsRpcTimer;
......
......@@ -27,4 +27,6 @@ SSchema tGetTableNameColumnSchema();
bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
#endif // TDENGINE_NAME_H
......@@ -15,4 +15,4 @@
#define TSDB_SQL_C
#include "qsqltype.h"
#include "tcmdtype.h"
......@@ -43,7 +43,7 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) {
STColumn *pCol = schemaColAt(pSchema, i);
tlen += taosEncodeFixedI8(buf, colType(pCol));
tlen += taosEncodeFixedI16(buf, colColId(pCol));
tlen += taosEncodeFixedI32(buf, colBytes(pCol));
tlen += taosEncodeFixedI16(buf, colBytes(pCol));
}
return tlen;
......@@ -65,10 +65,10 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
for (int i = 0; i < numOfCols; i++) {
int8_t type = 0;
int16_t colId = 0;
int32_t bytes = 0;
int16_t bytes = 0;
buf = taosDecodeFixedI8(buf, &type);
buf = taosDecodeFixedI16(buf, &colId);
buf = taosDecodeFixedI32(buf, &bytes);
buf = taosDecodeFixedI16(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
......@@ -105,7 +105,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder->version = version;
}
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) {
......
......@@ -41,6 +41,8 @@ int32_t tsStatusInterval = 1; // second
int16_t tsNumOfVnodesPerCore = 8;
int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM;
int32_t tsNumOfMnodes = 3;
int32_t tsEnableVnodeBak = 1;
// common
int32_t tsRpcTimer = 1000;
......@@ -422,6 +424,16 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "vnodeBak";
cfg.ptr = &tsEnableVnodeBak;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "balance";
cfg.ptr = &tsEnableBalance;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......
......@@ -49,4 +49,29 @@ SSchema tGetTableNameColumnSchema() {
bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN;
}
\ No newline at end of file
}
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
return pFilter;
}
......@@ -213,6 +213,8 @@ void cqDrop(void *handle) {
pObj->pStream = NULL;
cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->sqlStr);
free(pObj);
pthread_mutex_unlock(&pContext->mutex);
......
......@@ -176,6 +176,7 @@ void dnodeCleanupMgmt() {
tsMgmtQset = NULL;
tsMgmtQueue = NULL;
vnodeCleanupResources();
}
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
......@@ -242,8 +243,14 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue;
vnodeList[*numOfVnodes] = vnode;
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = vnode;
}
}
}
closedir(dir);
......@@ -276,7 +283,7 @@ static void *dnodeOpenVnode(void *param) {
static int32_t dnodeOpenVnodes() {
int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
int32_t numOfVnodes;
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
......@@ -337,7 +344,7 @@ static int32_t dnodeOpenVnodes() {
void dnodeStartStream() {
int32_t vnodeList[TSDB_MAX_VNODES];
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
......@@ -352,15 +359,14 @@ void dnodeStartStream() {
}
static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes;
int32_t vnodeList[TSDB_MAX_VNODES];
int32_t numOfVnodes = 0;
int32_t status;
status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
free(vnodeList);
return;
}
......@@ -368,7 +374,6 @@ static void dnodeCloseVnodes() {
vnodeClose(vnodeList[i]);
}
free(vnodeList);
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
......@@ -391,7 +396,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
}
void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId);
void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId);
if (pVnode != NULL) {
int32_t code = vnodeAlter(pVnode, pCreate);
vnodeRelease(pVnode);
......
......@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) {
pVnode = vnodeGetVnode(pHead->vgId);
} else {
pVnode = vnodeAccquireVnode(pHead->vgId);
}
pVnode = vnodeAcquireVnode(pHead->vgId);
if (pVnode == NULL) {
leftLen -= pHead->contLen;
......@@ -179,24 +175,19 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
assert(pVnode != NULL);
taos_queue queue = vnodeAcquireRqueue(pVnode);
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
code = TSDB_CODE_SUCCESS;
}
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
......@@ -206,6 +197,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
}
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode);
return;
}
static void *dnodeProcessReadQueue(void *param) {
......@@ -219,9 +216,16 @@ static void *dnodeProcessReadQueue(void *param) {
break;
}
dDebug("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
if (type == TAOS_QTYPE_RPC) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else {
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
}
taosFreeQitem(pReadMsg);
}
......
......@@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue);
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
......
......@@ -20,6 +20,7 @@ extern "C" {
#endif
typedef void* qinfo_t;
typedef void (*_qinfo_free_fn_t)(void*);
/**
* create the qinfo object according to QueryTableMsg
......@@ -28,15 +29,13 @@ typedef void* qinfo_t;
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, _qinfo_free_fn_t fn, qinfo_t* qinfo);
/**
* Destroy QInfo object
* @param qinfo qhandle
* @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @param param free callback params
*/
void qDestroyQueryInfo(qinfo_t qinfo, void (*fp)(void*), void* param);
void qDestroyQueryInfo(qinfo_t qinfo);
/**
* the main query execution function, including query on both table and multitables,
......@@ -45,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo, void (*fp)(void*), void* param);
* @param qinfo
* @return
*/
void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param);
void qTableQuery(qinfo_t qinfo);
/**
* Retrieve the produced results information, if current query is not paused or completed,
......@@ -81,11 +80,16 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
/**
* kill current ongoing query and free query handle automatically
* @param qinfo qhandle
* @param fp destroy callback function, while the qhandle is destoried, invoke the fp
* @param param free callback params
* @return
*/
int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param);
int32_t qKillQuery(qinfo_t qinfo);
void* qOpenQueryMgmt(int32_t vgId);
void qSetQueryMgmtClosed(void* pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, void* qInfo);
void** qAcquireQInfo(void* pMgmt, void** key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree);
#ifdef __cplusplus
}
......
......@@ -365,6 +365,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_CQ 3
#define TAOS_QTYPE_QUERY 4
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
......
......@@ -285,9 +285,9 @@ typedef struct {
int32_t tid;
int16_t tversion;
int16_t colId;
int16_t type;
int16_t bytes;
int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
char data[];
} SUpdateTableTagValMsg;
......
......@@ -49,16 +49,19 @@ int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeAccquireVnode(int32_t vgId); // add refcount
void* vnodeAcquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void* vnodeAcquireRqueue(void *);
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
void vnodeBuildStatusMsg(void * param);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes);
void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
......
......@@ -4,3 +4,4 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(shell)
ADD_SUBDIRECTORY(taosdemo)
ADD_SUBDIRECTORY(taosdump)
ADD_SUBDIRECTORY(taosmigrate)
\ No newline at end of file
......@@ -32,6 +32,7 @@
#include <time.h>
#include <unistd.h>
#include <wordexp.h>
#include <regex.h>
#include "taos.h"
#include "tutil.h"
......@@ -54,6 +55,7 @@ static struct argp_option options[] = {
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
{0, 's', "sql file", 0, "The select sql file.", 3},
{0, 'M', 0, 0, "Use metric flag.", 13},
{0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14},
{0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 6},
......@@ -79,6 +81,7 @@ typedef struct DemoArguments {
char *password;
char *database;
char *tb_prefix;
char *sqlFile;
bool use_metric;
bool insert_only;
char *output_file;
......@@ -120,6 +123,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'o':
arguments->output_file = arg;
break;
case 's':
arguments->sqlFile = arg;
break;
case 'q':
arguments->mode = atoi(arg);
break;
......@@ -179,10 +185,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
arguments->tb_prefix = arg;
break;
case 'M':
arguments->use_metric = true;
arguments->use_metric = false;
break;
case 'x':
arguments->insert_only = true;
arguments->insert_only = false;
break;
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
......@@ -253,6 +259,9 @@ typedef struct {
int data_of_rate;
int64_t start_time;
bool do_aggreFunc;
char* cols;
bool use_metric;
sem_t mutex_sem;
int notFinished;
......@@ -305,6 +314,8 @@ void rand_string(char *str, int size);
double getCurrentTime();
void callBack(void *param, TAOS_RES *res, int code);
void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass);
void querySqlFile(TAOS* taos, char* sqlFile);
int main(int argc, char *argv[]) {
SDemoArguments arguments = { NULL, // host
......@@ -313,6 +324,7 @@ int main(int argc, char *argv[]) {
"taosdata", // password
"test", // database
"t", // tb_prefix
NULL,
false, // use_metric
false, // insert_only
"./output.txt", // output_file
......@@ -361,7 +373,7 @@ int main(int argc, char *argv[]) {
abort();
#endif
}
enum MODE query_mode = arguments.mode;
char *ip_addr = arguments.host;
uint16_t port = arguments.port;
......@@ -385,6 +397,13 @@ int main(int argc, char *argv[]) {
char dataString[STRING_LEN];
bool do_aggreFunc = true;
if (NULL != arguments.sqlFile) {
TAOS* qtaos = taos_connect(ip_addr, user, pass, db_name, port);
querySqlFile(qtaos, arguments.sqlFile);
taos_close(qtaos);
return 0;
}
memset(dataString, 0, STRING_LEN);
int len = 0;
......@@ -495,47 +514,19 @@ int main(int argc, char *argv[]) {
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
}
if (!use_metric) {
/* Create all the tables; */
printf("Creating %d table(s)......\n", ntables);
for (int i = 0; i < ntables; i++) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols);
queryDB(taos, command);
}
printf("Table(s) created!\n");
taos_close(taos);
} else {
if (use_metric) {
/* Create metric table */
printf("Creating meters super table...\n");
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
queryDB(taos, command);
printf("meters created!\n");
/* Create all the tables; */
printf("Creating %d table(s)......\n", ntables);
for (int i = 0; i < ntables; i++) {
int j;
if (i % 10 == 0) {
j = 10;
} else {
j = i % 10;
}
if (j % 2 == 0) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai");
} else {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing");
}
queryDB(taos, command);
}
printf("Table(s) created!\n");
taos_close(taos);
}
/* Wait for table to create */
multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass);
/* Insert data */
double ts = getCurrentTime();
printf("Inserting data......\n");
......@@ -685,6 +676,198 @@ int main(int argc, char *argv[]) {
return 0;
}
#define MAX_SQL_SIZE 65536
void selectSql(TAOS* taos, char* sqlcmd)
{
TAOS_RES *pSql = taos_query(taos, sqlcmd);
int32_t code = taos_errno(pSql);
if (code != 0) {
printf("Failed to sqlcmd:%s, reason:%s\n", sqlcmd, taos_errstr(pSql));
taos_free_result(pSql);
exit(1);
}
int count = 0;
while (taos_fetch_row(pSql) != NULL) {
count++;
}
taos_free_result(pSql);
return;
}
/* Function to do regular expression check */
static int regexMatch(const char *s, const char *reg, int cflags) {
regex_t regex;
char msgbuf[100] = {0};
/* Compile regular expression */
if (regcomp(&regex, reg, cflags) != 0) {
printf("Fail to compile regex\n");
exit(-1);
}
/* Execute regular expression */
int reti = regexec(&regex, s, 0, NULL, 0);
if (!reti) {
regfree(&regex);
return 1;
} else if (reti == REG_NOMATCH) {
regfree(&regex);
return 0;
} else {
regerror(reti, &regex, msgbuf, sizeof(msgbuf));
printf("Regex match failed: %s\n", msgbuf);
regfree(&regex);
exit(-1);
}
return 0;
}
static int isCommentLine(char *line) {
if (line == NULL) return 1;
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
}
void querySqlFile(TAOS* taos, char* sqlFile)
{
FILE *fp = fopen(sqlFile, "r");
if (fp == NULL) {
printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno));
exit(-1);
}
int read_len = 0;
char * cmd = calloc(1, MAX_SQL_SIZE);
size_t cmd_len = 0;
char * line = NULL;
size_t line_len = 0;
double t = getCurrentTime();
while ((read_len = getline(&line, &line_len, fp)) != -1) {
if (read_len >= MAX_SQL_SIZE) continue;
line[--read_len] = '\0';
if (read_len == 0 || isCommentLine(line)) { // line starts with #
continue;
}
if (line[read_len - 1] == '\\') {
line[read_len - 1] = ' ';
memcpy(cmd + cmd_len, line, read_len);
cmd_len += read_len;
continue;
}
memcpy(cmd + cmd_len, line, read_len);
selectSql(taos, cmd);
memset(cmd, 0, MAX_SQL_SIZE);
cmd_len = 0;
}
t = getCurrentTime() - t;
printf("run %s took %.6f second(s)\n\n", sqlFile, t);
free(cmd);
if (line) free(line);
fclose(fp);
return;
}
void * createTable(void *sarg)
{
char command[BUFFER_SIZE] = "\0";
info *winfo = (info *)sarg;
if (!winfo->use_metric) {
/* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
queryDB(winfo->taos, command);
}
taos_close(winfo->taos);
} else {
/* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
int j;
if (i % 10 == 0) {
j = 10;
} else {
j = i % 10;
}
if (j % 2 == 0) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "shanghai");
} else {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "beijing");
}
queryDB(winfo->taos, command);
}
taos_close(winfo->taos);
}
return NULL;
}
void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass) {
double ts = getCurrentTime();
printf("create table......\n");
pthread_t *pids = malloc(threads * sizeof(pthread_t));
info *infos = malloc(threads * sizeof(info));
int a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int b = 0;
if (threads != 0)
b = ntables % threads;
int last = 0;
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
tstrncpy(t_info->tb_prefix, tb_prefix, MAX_TB_NAME_SIZE);
t_info->taos = taos_connect(ip_addr, user, pass, db_name, port);
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->use_metric = use_metric;
t_info->cols = cols;
pthread_create(pids + i, NULL, createTable, t_info);
}
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
double t = getCurrentTime() - ts;
printf("Spent %.4f seconds to create %d tables with %d connections\n", t, ntables, threads);
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
taos_close(t_info->taos);
sem_destroy(&(t_info->mutex_sem));
sem_destroy(&(t_info->lock_sem));
}
free(pids);
free(infos);
return ;
}
void *readTable(void *sarg) {
info *rinfo = (info *)sarg;
TAOS *taos = rinfo->taos;
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(taosmigrate ${SRC})
TARGET_LINK_LIBRARIES(taosmigrate common tutil cJson)
ENDIF ()
SET_SOURCE_FILES_PROPERTIES(./taosmigrate.c PROPERTIES COMPILE_FLAGS -w)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taosmigrate.h"
/* The options we understand. */
static struct argp_option options[] = {
{0, 'r', "data dir", 0, "data dir", 0},
{0, 'd', "dnodeId", 0, "dnode id", 1},
{0, 'p', "port", 0, "dnode port", 1},
{0, 'f', "fqdn", 0, "dnode fqdn", 1},
{0, 'g', "multi dnodes", 0, "multi dnode info, e.g. \"2 7030 fqdn1, 3 8030 fqdn2\"", 2},
{0}};
/* Used by main to communicate with parse_opt. */
struct arguments {
char* dataDir;
int32_t dnodeId;
uint16_t port;
char* fqdn;
char* dnodeGroups;
char** arg_list;
int arg_list_len;
};
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
struct arguments *arguments = state->input;
switch (key) {
case 'w':
arguments->dataDir = arg;
break;
case 'd':
arguments->dnodeId = atoi(arg);
break;
case 'p':
arguments->port = atoi(arg);
break;
case 'f':
arguments->fqdn = arg;
case 'g':
arguments->dnodeGroups = arg;
break;
case ARGP_KEY_ARG:
arguments->arg_list = &state->argv[state->next - 1];
arguments->arg_list_len = state->argc - state->next + 1;
state->next = state->argc;
argp_usage(state);
break;
default:
return ARGP_ERR_UNKNOWN;
}
return 0;
}
static struct argp argp = {options, parse_opt, 0, 0};
struct arguments arguments = {NULL, 0, 0, NULL, NULL, NULL, 0};
SdnodeGroup tsDnodeGroup = {0};
int tSystemShell(const char * cmd)
{
FILE * fp;
int res;
char buf[1024];
if (cmd == NULL) {
printf("tSystem cmd is NULL!\n");
return -1;
}
if ((fp = popen(cmd, "r") ) == NULL) {
printf("popen cmd:%s error: %s/n", cmd, strerror(errno));
return -1;
} else {
while(fgets(buf, sizeof(buf), fp)) {
printf("popen result:%s", buf);
}
if ((res = pclose(fp)) == -1) {
printf("close popen file pointer fp error!\n");
} else {
printf("popen res is :%d\n", res);
}
return res;
}
}
void taosMvFile(char* destFile, char *srcFile) {
char shellCmd[1024+1] = {0};
//(void)snprintf(shellCmd, 1024, "cp -rf %s %s", srcDir, destDir);
(void)snprintf(shellCmd, 1024, "mv -f %s %s", srcFile, destFile);
tSystemShell(shellCmd);
}
SdnodeIfo* getDnodeInfo(int32_t dnodeId)
{
for (int32_t i = 0; i < tsDnodeGroup.dnodeNum; i++) {
if (dnodeId == tsDnodeGroup.dnodeArray[i].dnodeId) {
return &(tsDnodeGroup.dnodeArray[i]);
}
}
return NULL;
}
void parseOneDnodeInfo(char* buf, SdnodeIfo* pDnodeInfo)
{
char *ptr;
char *p;
int32_t i = 0;
ptr = strtok_r(buf, " ", &p);
while(ptr != NULL) {
if (0 == i) {
pDnodeInfo->dnodeId = atoi(ptr);
} else if (1 == i) {
pDnodeInfo->port = atoi(ptr);
} else if (2 == i) {
tstrncpy(pDnodeInfo->fqdn, ptr, TSDB_FQDN_LEN);
} else {
printf("input parameter error near:%s\n", buf);
exit(-1);
}
i++;
ptr = strtok_r(NULL, " ", &p);
}
snprintf(pDnodeInfo->ep, TSDB_EP_LEN, "%s:%d", pDnodeInfo->fqdn, pDnodeInfo->port);
}
void saveDnodeGroups()
{
if ((NULL != arguments.fqdn) && (arguments.dnodeId > 0) && (0 != arguments.port)) {
//printf("dnodeId:%d port:%d fqdn:%s ep:%s\n", arguments.dnodeId, arguments.port, arguments.fqdn, arguments.ep);
tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].dnodeId = arguments.dnodeId;
tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].port = arguments.port;
tstrncpy(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].fqdn, arguments.fqdn, TSDB_FQDN_LEN);
snprintf(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].ep, TSDB_EP_LEN, "%s:%d", tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].fqdn, tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum].port);
tsDnodeGroup.dnodeNum++;
}
if (NULL == arguments.dnodeGroups) {
return;
}
//printf("dnodeGroups:%s\n", arguments.dnodeGroups);
char buf[1024];
char* str = NULL;
char* start = arguments.dnodeGroups;
while (NULL != (str = strstr(start, ","))) {
memcpy(buf, start, str - start);
// parse one dnode info: dnodeId port fqdn ep
parseOneDnodeInfo(buf, &(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum]));
tsDnodeGroup.dnodeNum++;
// next
start = str + 1;
str = NULL;
}
if (strlen(start)) {
parseOneDnodeInfo(start, &(tsDnodeGroup.dnodeArray[tsDnodeGroup.dnodeNum]));
tsDnodeGroup.dnodeNum++;
}
}
int32_t main(int32_t argc, char *argv[]) {
memset(&tsDnodeGroup, 0, sizeof(SdnodeGroup));
argp_parse(&argp, argc, argv, 0, 0, &arguments);
if ((NULL == arguments.dataDir) || ((NULL == arguments.dnodeGroups)
&& (NULL == arguments.fqdn || arguments.dnodeId < 1 || 0 == arguments.port))) {
printf("input parameter error!\n");
return -1;
}
saveDnodeGroups();
printf("===================arguments:==================\n");
printf("oldWal:%s\n", arguments.dataDir);
for (int32_t i = 0; i < tsDnodeGroup.dnodeNum; i++) {
printf("dnodeId:%d port:%d fqdn:%s ep:%s\n", tsDnodeGroup.dnodeArray[i].dnodeId,
tsDnodeGroup.dnodeArray[i].port,
tsDnodeGroup.dnodeArray[i].fqdn,
tsDnodeGroup.dnodeArray[i].ep);
}
printf("===========================\n");
// 1. modify wal for mnode
char mnodeWal[TSDB_FILENAME_LEN*2] = {0};
(void)snprintf(mnodeWal, TSDB_FILENAME_LEN*2, "%s/mnode/wal/wal0", arguments.dataDir);
walModWalFile(mnodeWal);
// 2. modfiy dnode config: mnodeIpList.json
char dnodeIpList[TSDB_FILENAME_LEN*2] = {0};
(void)snprintf(dnodeIpList, TSDB_FILENAME_LEN*2, "%s/dnode/mnodeIpList.json", arguments.dataDir);
modDnodeIpList(dnodeIpList);
// 3. modify vnode config: config.json
char vnodeDir[TSDB_FILENAME_LEN*2] = {0};
(void)snprintf(vnodeDir, TSDB_FILENAME_LEN*2, "%s/vnode", arguments.dataDir);
modAllVnode(vnodeDir);
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TAOS_MIGRATE_H
#define TAOS_MIGRATE_H
#ifdef __cplusplus
extern "C" {
#endif
#define _GNU_SOURCE
#ifndef _ALPINE
#include <error.h>
#endif
#include <argp.h>
#include <assert.h>
#include <inttypes.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#include "taosdef.h"
#include "tutil.h"
#include "twal.h"
#include "tchecksum.h"
#include "mnodeDef.h"
#include "mnodeSdb.h"
#include "cJSON.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tsdb.h"
//#include "vnode.h"
#include "vnodeInt.h"
#define MAX_DNODE_NUM 128
typedef struct _SdnodeIfo {
int32_t dnodeId;
uint16_t port;
char fqdn[TSDB_FQDN_LEN+1];
char ep[TSDB_EP_LEN+1];
} SdnodeIfo;
typedef struct _SdnodeGroup {
int32_t dnodeNum;
SdnodeIfo dnodeArray[MAX_DNODE_NUM];
} SdnodeGroup;
int tSystemShell(const char * cmd);
void taosMvFile(char* destFile, char *srcFile) ;
void walModWalFile(char* walfile);
SdnodeIfo* getDnodeInfo(int32_t dnodeId);
void modDnodeIpList(char* dnodeIpList);
void modAllVnode(char *vnodeDir);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taosmigrate.h"
//#include "dnodeInt.h"
//#include "dnodeMgmt.h"
//#include "dnodeVRead.h"
//#include "dnodeVWrite.h"
//#include "dnodeModule.h"
static SDMMnodeInfos tsDnodeIpInfos = {0};
static bool dnodeReadMnodeInfos(char* dnodeIpList) {
FILE *fp = fopen(dnodeIpList, "r");
if (!fp) {
printf("failed to read mnodeIpList.json, file not exist\n");
return false;
}
bool ret = false;
int maxLen = 2000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
printf("failed to read mnodeIpList.json, content is null\n");
return false;
}
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
printf("failed to read mnodeIpList.json, invalid json format\n");
goto PARSE_OVER;
}
cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
printf("failed to read mnodeIpList.json, inUse not found\n");
goto PARSE_OVER;
}
tsDnodeIpInfos.inUse = inUse->valueint;
cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
printf("failed to read mnodeIpList.json, nodeNum not found\n");
goto PARSE_OVER;
}
tsDnodeIpInfos.nodeNum = nodeNum->valueint;
cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
printf("failed to read mnodeIpList.json, nodeInfos not found\n");
goto PARSE_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != tsDnodeIpInfos.nodeNum) {
printf("failed to read mnodeIpList.json, nodeInfos size not matched\n");
goto PARSE_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
printf("failed to read mnodeIpList.json, nodeId not found\n");
goto PARSE_OVER;
}
tsDnodeIpInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
printf("failed to read mnodeIpList.json, nodeName not found\n");
goto PARSE_OVER;
}
strncpy(tsDnodeIpInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
SdnodeIfo* pDnodeInfo = getDnodeInfo(tsDnodeIpInfos.nodeInfos[i].nodeId);
if (NULL == pDnodeInfo) {
continue;
}
tstrncpy(tsDnodeIpInfos.nodeInfos[i].nodeEp, pDnodeInfo->ep, TSDB_EP_LEN);
}
ret = true;
//printf("read mnode iplist successed, numOfIps:%d inUse:%d\n", tsDnodeIpInfos.nodeNum, tsDnodeIpInfos.inUse);
//for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) {
// printf("mnode:%d, %s\n", tsDnodeIpInfos.nodeInfos[i].nodeId, tsDnodeIpInfos.nodeInfos[i].nodeEp);
//}
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveMnodeInfos(char* dnodeIpList) {
FILE *fp = fopen(dnodeIpList, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDnodeIpInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDnodeIpInfos.nodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsDnodeIpInfos.nodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDnodeIpInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDnodeIpInfos.nodeInfos[i].nodeEp);
if (i < tsDnodeIpInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
printf("mod mnode iplist successed\n");
}
void modDnodeIpList(char* dnodeIpList)
{
(void)dnodeReadMnodeInfos(dnodeIpList);
dnodeSaveMnodeInfos(dnodeIpList);
return;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taosmigrate.h"
static void recordWrite(int fd, SWalHead *pHead) {
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
int contLen = pHead->len + sizeof(SWalHead);
if(write(fd, pHead, contLen) != contLen) {
printf("failed to write(%s)", strerror(errno));
exit(-1);
}
}
static void recordMod(SWalHead* pHead)
{
SDnodeObj *pDnode;
ESdbTable tableId = (ESdbTable)(pHead->msgType / 10);
switch (tableId) {
case SDB_TABLE_DNODE:
case SDB_TABLE_MNODE:
pDnode = (SDnodeObj *)pHead->cont;
printf("dnodeId:%d port:%d fqdn:%s ep:%s\n", pDnode->dnodeId, pDnode->dnodePort, pDnode->dnodeFqdn, pDnode->dnodeEp);
SdnodeIfo* pDnodeInfo = getDnodeInfo(pDnode->dnodeId);
if (NULL == pDnodeInfo) {
break;
}
pDnode->dnodePort = pDnodeInfo->port;
tstrncpy(pDnode->dnodeFqdn, pDnodeInfo->fqdn, sizeof(pDnode->dnodeFqdn));
tstrncpy(pDnode->dnodeEp, pDnodeInfo->ep, sizeof(pDnode->dnodeEp));
break;
#if 0
case SDB_TABLE_ACCOUNT:
SAcctObj *pAcct = (SDnodeObj *)pHead->cont;
break;
case SDB_TABLE_USER:
SUserObj *pUser = (SDnodeObj *)pHead->cont;
break;
case SDB_TABLE_DB:
SDbObj *pDb = (SDnodeObj *)pHead->cont;
break;
case SDB_TABLE_VGROUP:
SVgObj *pVgroup = (SDnodeObj *)pHead->cont;
break;
case SDB_TABLE_STABLE:
SSuperTableObj *pStable = (SDnodeObj *)pHead->cont;
break;
case SDB_TABLE_CTABLE:
SChildTableObj *pCTable = (SDnodeObj *)pHead->cont;
break;
#endif
default:
break;
}
}
void walModWalFile(char* walfile) {
char *buffer = malloc(1024000); // size for one record
if (buffer == NULL) {
printf("failed to malloc:%s\n", strerror(errno));
return ;
}
SWalHead *pHead = (SWalHead *)buffer;
int rfd = open(walfile, O_RDONLY);
if (rfd < 0) {
printf("failed to open %s failed:%s\n", walfile, strerror(errno));
free(buffer);
return ;
}
char newWalFile[32] = "wal0";
int wfd = open(newWalFile, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (wfd < 0) {
printf("wal:%s, failed to open(%s)\n", newWalFile, strerror(errno));
free(buffer);
return ;
}
printf("start to mod %s into %s\n", walfile, newWalFile);
while (1) {
memset(buffer, 0, 1024000);
int ret = read(rfd, pHead, sizeof(SWalHead));
if ( ret == 0) break;
if (ret != sizeof(SWalHead)) {
printf("wal:%s, failed to read head, skip, ret:%d(%s)\n", walfile, ret, strerror(errno));
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
printf("wal:%s, cksum is messed up, skip the rest of file\n", walfile);
break;
}
ret = read(rfd, pHead->cont, pHead->len);
if ( ret != pHead->len) {
printf("wal:%s, failed to read body, skip, len:%d ret:%d\n", walfile, pHead->len, ret);
break;
}
recordMod(pHead);
recordWrite(wfd, pHead);
}
close(rfd);
close(wfd);
free(buffer);
taosMvFile(walfile, newWalFile);
return ;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taosmigrate.h"
static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
{
FILE *fp = fopen(cfgFile, "w");
if (!fp) {
printf("failed to open vnode cfg file for write, file:%s error:%s\n", cfgFile, strerror(errno));
return errno;
}
int32_t len = 0;
int32_t maxLen = 1000;
char * content = calloc(1, maxLen + 1);
if (content == NULL) {
fclose(fp);
return -1;
}
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pVnode->db);
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnode->cfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnode->tsdbCfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnode->tsdbCfg.totalBlocks);
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnode->tsdbCfg.maxTables);
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnode->tsdbCfg.daysPerFile);
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnode->tsdbCfg.keep);
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnode->tsdbCfg.keep1);
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnode->tsdbCfg.keep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnode->tsdbCfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnode->syncCfg.replica);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnode->walCfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnode->syncCfg.quorum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnode->syncCfg.nodeInfo[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s:%d\"\n", pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
if (i < pVnode->syncCfg.replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
printf("mod vnode cfg %s successed\n", cfgFile);
return 0;
}
static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
{
cJSON *root = NULL;
char *content = NULL;
int maxLen = 1000;
int32_t ret = -1;
FILE *fp = fopen(cfgFile, "r");
if (!fp) {
printf("failed to open vnode cfg file:%s to read, error:%s\n", cfgFile, strerror(errno));
goto PARSE_OVER;
}
content = calloc(1, maxLen + 1);
if (content == NULL) {
goto PARSE_OVER;
}
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
printf("failed to read vnode cfg, content is null, error:%s\n", strerror(errno));
goto PARSE_OVER;
}
root = cJSON_Parse(content);
if (root == NULL) {
printf("failed to json parse %s, invalid json format\n", cfgFile);
goto PARSE_OVER;
}
cJSON *db = cJSON_GetObjectItem(root, "db");
if (!db || db->type != cJSON_String || db->valuestring == NULL) {
printf("vgId:%d, failed to read vnode cfg, db not found\n", pVnode->vgId);
goto PARSE_OVER;
}
strcpy(pVnode->db, db->valuestring);
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!cfgVersion || cfgVersion->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, cfgVersion not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->cfgVersion = cfgVersion->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, cacheBlockSize not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.cacheBlockSize = cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, totalBlocks not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
if (!maxTables || maxTables->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, daysPerFile not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;
cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
if (!daysToKeep || daysToKeep->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, daysToKeep not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep = daysToKeep->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, daysToKeep1 not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep1 = daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, daysToKeep2 not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.keep2 = daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, maxRowsPerFileBlock not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
if (!commitTime || commitTime->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, precision not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.precision = (int8_t)precision->valueint;
cJSON *compression = cJSON_GetObjectItem(root, "compression");
if (!compression || compression->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, compression not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel");
if (!walLevel || walLevel->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, walLevel not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
cJSON *wals = cJSON_GetObjectItem(root, "wals");
if (!wals || wals->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, wals not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->walCfg.wals = (int8_t)wals->valueint;
pVnode->walCfg.keep = 0;
cJSON *replica = cJSON_GetObjectItem(root, "replica");
if (!replica || replica->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, replica not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.replica = (int8_t)replica->valueint;
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
if (!quorum || quorum->type != cJSON_Number) {
printf("vgId: %d, failed to read vnode cfg, quorum not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
printf("vgId:%d, failed to read vnode cfg, nodeInfos not found\n", pVnode->vgId);
goto PARSE_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != pVnode->syncCfg.replica) {
printf("vgId:%d, failed to read vnode cfg, nodeInfos size not matched\n", pVnode->vgId);
goto PARSE_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
printf("vgId:%d, failed to read vnode cfg, nodeId not found\n", pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
printf("vgId:%d, failed to read vnode cfg, nodeFqdn not found\n", pVnode->vgId);
goto PARSE_OVER;
}
taosGetFqdnPortFromEp(nodeEp->valuestring, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort);
//pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
SdnodeIfo* pDnodeInfo = getDnodeInfo(pVnode->syncCfg.nodeInfo[i].nodeId);
if (NULL == pDnodeInfo) {
continue;
}
pVnode->syncCfg.nodeInfo[i].nodePort = pDnodeInfo->port;
tstrncpy(pVnode->syncCfg.nodeInfo[i].nodeFqdn, pDnodeInfo->fqdn, TSDB_FQDN_LEN);
}
ret = 0;
//printf("read vnode cfg successfully, replcia:%d\n", pVnode->syncCfg.replica);
//for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
// printf("dnode:%d, %s:%d\n", pVnode->syncCfg.nodeInfo[i].nodeId, pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
//}
PARSE_OVER:
tfree(content);
cJSON_Delete(root);
if (fp) fclose(fp);
return ret;
}
static void modVnodeCfg(char* vnodeCfg)
{
int32_t ret;
SVnodeObj vnodeObj = {0};
ret = readVnodeCfg(&vnodeObj, vnodeCfg);
if (0 != ret) {
printf("read vnode cfg %s fail!\n", vnodeCfg);
return ;
}
(void)saveVnodeCfg(&vnodeObj, vnodeCfg);
return ;
}
void modAllVnode(char *vnodeDir)
{
DIR *dir = opendir(vnodeDir);
if (dir == NULL) return;
char filename[1024];
struct dirent *de = NULL;
while ((de = readdir(dir)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) continue;
if ((de->d_type & DT_DIR) && (strncmp(de->d_name, "vnode", 5) == 0)) {
memset(filename, 0, 1024);
snprintf(filename, 1023, "%s/%s/config.json", vnodeDir, de->d_name);
modVnodeCfg(filename);
}
}
closedir(dir);
}
......@@ -135,7 +135,8 @@ typedef struct SVgObj {
char dbName[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int8_t inUse;
int8_t accessState;
int8_t reserved0[5];
int8_t status;
int8_t reserved0[4];
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int8_t reserved1[7];
int8_t updateEnd[1];
......
......@@ -41,7 +41,7 @@ int32_t mnodeInitProfile();
void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
......
......@@ -367,7 +367,6 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
pAccess++;
mnodeDecVgroupRef(pVgroup);
}
}
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
......
......@@ -43,7 +43,7 @@
extern void *tsMnodeTmr;
static SCacheObj *tsMnodeConnCache = NULL;
static uint32_t tsConnIndex = 0;
static int32_t tsConnIndex = 0;
static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
tsMnodeConnCache = taosCacheInitWithCb(CONN_CHECK_TIME, mnodeFreeConn);
tsMnodeConnCache = taosCacheInit(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn");
return 0;
}
......@@ -89,7 +89,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
return NULL;
}
uint32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
int32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1);
SConnObj connObj = {
......@@ -100,9 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
};
tstrncpy(connObj.user, user, sizeof(connObj.user));
char key[10];
sprintf(key, "%u", connId);
SConnObj *pConn = taosCachePut(tsMnodeConnCache, key, &connObj, sizeof(connObj), CONN_KEEP_TIME);
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME);
mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return pConn;
......@@ -113,12 +111,9 @@ void mnodeReleaseConn(SConnObj *pConn) {
taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
}
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
char key[10];
sprintf(key, "%u", connId);
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) {
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, key, expireTime);
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime);
if (pConn == NULL) {
mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return NULL;
......@@ -547,7 +542,8 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) {
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......@@ -576,8 +572,9 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) {
}
int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByName(tsMnodeConnCache, connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......@@ -594,7 +591,8 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
SConnObj * pConn = taosCacheAcquireByName(tsMnodeConnCache, pKill->queryId);
int32_t connId = atoi(pKill->queryId);
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......
......@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInitWithCb(5, mnodeFreeShowObj);
tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show");
return 0;
}
......@@ -364,10 +364,7 @@ static bool mnodeCheckShowFinished(SShowObj *pShow) {
}
static bool mnodeAccquireShowObj(SShowObj *pShow) {
char key[10];
sprintf(key, "%d", pShow->index);
SShowObj *pSaved = taosCacheAcquireByName(tsMnodeShowCache, key);
SShowObj *pSaved = taosCacheAcquireByKey(tsMnodeShowCache, &pShow->index, sizeof(int32_t));
if (pSaved == pShow) {
mDebug("%p, show is accquired from cache", pShow);
return true;
......@@ -378,14 +375,11 @@ static bool mnodeAccquireShowObj(SShowObj *pShow) {
static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
if (tsMnodeShowCache != NULL) {
char key[10];
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
sprintf(key, "%d", pShow->index);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 6);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, &pShow->index, sizeof(int32_t), pShow, size, 6);
mDebug("%p, show is put into cache, index:%d", newQhandle, pShow->index);
free(pShow);
mDebug("%p, show is put into cache, index:%s", newQhandle, key);
return newQhandle;
}
......
......@@ -38,6 +38,11 @@
#include "mnodeVgroup.h"
#include "mnodePeer.h"
typedef enum {
TAOS_VG_STATUS_READY,
TAOS_VG_STATUS_DROPPING
} EVgroupStatus;
static void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0;
......@@ -279,7 +284,7 @@ void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_t o
pNextV++;
}
if (i == openVnodes) {
if (i == openVnodes && pVgroup->status == TAOS_VG_STATUS_READY) {
mnodeSendCreateVgroupMsg(pVgroup, NULL);
}
......@@ -728,6 +733,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
}
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
pVgroup->status = TAOS_VG_STATUS_DROPPING; // deleting
mDebug("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
......
......@@ -86,9 +86,28 @@ extern "C" {
} \
}
#ifdef TAOS_RANDOM_NETWORK_FAIL
ssize_t taos_send_random_fail(int sockfd, const void *buf, size_t len, int flags);
ssize_t taos_sendto_random_fail(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t taos_read_random_fail(int fd, void *buf, size_t count);
ssize_t taos_write_random_fail(int fd, const void *buf, size_t count);
#define send(sockfd, buf, len, flags) taos_send_random_fail(sockfd, buf, len, flags)
#define sendto(sockfd, buf, len, flags, dest_addr, addrlen) \
taos_sendto_random_fail(sockfd, buf, len, flags, dest_addr, addrlen)
#define taosWriteSocket(fd, buf, len) taos_write_random_fail(fd, buf, len)
#define taosReadSocket(fd, buf, len) taos_read_random_fail(fd, buf, len)
#else
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#endif /* TAOS_RANDOM_NETWORK_FAIL */
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
......
......@@ -270,3 +270,49 @@ int tSystem(const char * cmd)
}
}
#ifdef TAOS_RANDOM_NETWORK_FAIL
#define RANDOM_NETWORK_FAIL_FACTOR 20
ssize_t taos_send_random_fail(int sockfd, const void *buf, size_t len, int flags)
{
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET;
return -1;
}
return send(sockfd, buf, len, flags);
}
ssize_t taos_sendto_random_fail(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET;
return -1;
}
return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
}
ssize_t taos_read_random_fail(int fd, void *buf, size_t count)
{
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET;
return -1;
}
return read(fd, buf, count);
}
ssize_t taos_write_random_fail(int fd, const void *buf, size_t count)
{
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = EINTR;
return -1;
}
return write(fd, buf, count);
}
#endif /* TAOS_RANDOM_NETWORK_FAIL */
......@@ -160,7 +160,7 @@ static void taosGetSystemTimezone() {
/* load time zone string from /etc/timezone */
FILE *f = fopen("/etc/timezone", "r");
char buf[65] = {0};
char buf[68] = {0};
if (f != NULL) {
int len = fread(buf, 64, 1, f);
if(len < 64 && ferror(f)) {
......@@ -170,18 +170,17 @@ static void taosGetSystemTimezone() {
}
fclose(f);
}
char *lineEnd = strstr(buf, "\n");
if (lineEnd != NULL) {
*lineEnd = 0;
}
char *lineEnd = strstr(buf, "\n");
if (lineEnd != NULL) {
*lineEnd = 0;
}
// for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
if (strlen(buf) > 0) {
setenv("TZ", buf, 1);
// for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
if (strlen(buf) > 0) {
setenv("TZ", buf, 1);
}
}
// get and set default timezone
tzset();
......
......@@ -53,12 +53,12 @@ static void httpDestroyContext(void *data) {
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
httpDebug("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
httpDebug("context:%p, is destroyed, refCount:%d data:%p", pContext, pContext->refCount, data);
tfree(pContext);
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext);
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
......@@ -103,17 +103,14 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL;
char contextStr[16] = {0};
snprintf(contextStr, sizeof(contextStr), "%p", pContext);
pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3);
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &pContext, sizeof(void *), &pContext, sizeof(void *), 3);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext);
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
......@@ -122,16 +119,13 @@ HttpContext *httpCreateContext(int32_t fd) {
}
HttpContext *httpGetContext(void *ptr) {
char contextStr[16] = {0};
snprintf(contextStr, sizeof(contextStr), "%p", ptr);
HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr);
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &ptr, sizeof(HttpContext *));
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpDebug("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount);
httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount);
return pContext;
}
}
......@@ -141,9 +135,10 @@ HttpContext *httpGetContext(void *ptr) {
void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
httpDebug("context:%p, is releasd, refCount:%d", pContext, refCount);
HttpContext **ppContext = pContext->ppContext;
httpDebug("context:%p, is releasd, data:%p refCount:%d", pContext, ppContext, refCount);
if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
......
......@@ -441,7 +441,7 @@ void httpJsonPairStatus(JsonBuf* buf, int code) {
} else {
httpJsonPair(buf, "status", 6, "error", 5);
httpJsonItemToken(buf);
httpJsonPairIntVal(buf, "code", 4, code);
httpJsonPairIntVal(buf, "code", 4, code & 0XFFFF);
httpJsonItemToken(buf);
if (code == TSDB_CODE_MND_DB_NOT_SELECTED) {
httpJsonPair(buf, "desc", 4, "failed to create database", 23);
......
......@@ -174,9 +174,9 @@ void httpSendErrorRespWithDesc(HttpContext *pContext, int errNo, char *desc) {
}
if (desc == NULL) {
httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo + 1000, httpMsg[errNo]);
httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo + 5000, httpMsg[errNo]);
} else {
httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo + 1000, desc);
httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo + 5000, desc);
}
}
......@@ -184,7 +184,8 @@ void httpSendErrorResp(HttpContext *pContext, int errNo) { httpSendErrorRespWith
void httpSendTaosdErrorResp(HttpContext *pContext, int errCode) {
int httpCode = 400;
httpSendErrorRespImp(pContext, httpCode, "Bad Request", 1000, (char*)tstrerror(errCode));
httpSendErrorRespImp(pContext, httpCode, "Bad Request", errCode & 0XFFFF, (char*)tstrerror(errCode));
}
void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char* errMsg) {
......@@ -200,7 +201,7 @@ void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char* errMsg) {
} else {}
}
httpSendErrorRespImp(pContext, httpCode, "Bad Request", 1000, temp);
httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_SQL & 0XFFFF, temp);
}
void httpSendSuccResp(HttpContext *pContext, char *desc) {
......
......@@ -85,6 +85,7 @@ bool httpReadDataImp(HttpContext *pContext) {
} else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
pContext, pContext->fd, pContext->ipstr, errno);
httpReleaseContext(pContext);
return false;
}
} else {
......@@ -153,6 +154,7 @@ static bool httpReadData(HttpContext *pContext) {
int ret = httpCheckReadCompleted(pContext);
if (ret == HTTP_CHECK_BODY_CONTINUE) {
//httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
httpReleaseContext(pContext);
return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
......@@ -161,11 +163,13 @@ static bool httpReadData(HttpContext *pContext) {
return true;
} else {
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
} else {
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
httpNotifyContextClose(pContext);
httpReleaseContext(pContext);
return false;
}
}
......
......@@ -33,9 +33,9 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
memset(&session, 0, sizeof(HttpSession));
session.taos = taos;
session.refCount = 1;
snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
int32_t len = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire);
pContext->session = taosCachePut(server->sessionCache, session.id, len, &session, sizeof(HttpSession), tsHttpSessionExpire);
// void *temp = pContext->session;
// taosCacheRelease(server->sessionCache, (void **)&temp, false);
......@@ -57,9 +57,9 @@ static void httpFetchSessionImp(HttpContext *pContext) {
pthread_mutex_lock(&server->serverMutex);
char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
int32_t len = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId);
pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len);
if (pContext->session != NULL) {
atomic_add_fetch_32(&pContext->session->refCount, 1);
httpDebug("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd,
......@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
}
bool httpInitSessions() {
tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession);
tsHttpServer.sessionCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests");
if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache");
return false;
......
......@@ -111,7 +111,7 @@ void mqttStopSystem() {
}
void mqttCleanUpSystem() {
mqttInfo("starting to clean up mqtt");
mqttInfo("starting to cleanup mqtt");
free(recntStatus.user_name);
free(recntStatus.password);
free(recntStatus.hostname);
......
......@@ -27,6 +27,7 @@
#include "tref.h"
#include "tsdb.h"
#include "tsqlfunction.h"
#include "query.h"
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
......@@ -94,16 +95,13 @@ typedef struct SSingleColumnFilterInfo {
} SSingleColumnFilterInfo;
typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
int32_t tableIndex;
int32_t groupIndex; // group id in table list
TSKEY lastKey;
int32_t numOfRes;
int32_t groupIndex; // group id in table list
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag;
STimeWindow win;
STSCursor cur;
void* pTable; // for retrieve the page id list
void* pTable; // for retrieve the page id list
SWindowResInfo windowResInfo;
} STableQueryInfo;
......@@ -126,11 +124,6 @@ typedef struct SQueryCostInfo {
uint64_t computTime;
} SQueryCostInfo;
//typedef struct SGroupItem {
// void *pTable;
// STableQueryInfo *info;
//} SGroupItem;
typedef struct SQuery {
int16_t numOfCols;
int16_t numOfTags;
......@@ -172,22 +165,22 @@ typedef struct SQueryRuntimeEnv {
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostInfo summary;
bool stableQuery; // super table query or not
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool stableQuery; // super table query or not
bool topBotQuery; // false
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;
typedef struct SQInfo {
void* signature;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
sem_t dataReady;
void* tsdb;
int32_t vgId;
void* signature;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
sem_t dataReady;
void* tsdb;
void* param;
int32_t vgId;
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv;
......@@ -202,8 +195,10 @@ typedef struct SQInfo {
* We later may refactor to remove this attribution by using another flag to denote
* whether a multimeter query is completed or not.
*/
int32_t tableIndex;
int32_t numOfGroupResultPages;
int32_t tableIndex;
int32_t numOfGroupResultPages;
_qinfo_free_fn_t freeFn;
jmp_buf env;
} SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H
......@@ -13,7 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tcache.h"
#include "tglobal.h"
#include "qfill.h"
#include "taosmsg.h"
#include "hash.h"
#include "qExecutor.h"
......@@ -22,11 +25,11 @@
#include "qresultBuf.h"
#include "query.h"
#include "queryLog.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "tscUtil.h" // todo move the function to common module
#include "exception.h"
#include "tscompression.h"
#include "ttime.h"
#include "tfile.h"
/**
* check if the primary column is load by default, otherwise, the program will
......@@ -87,6 +90,19 @@ typedef struct {
STSCursor cur;
} SQueryStatusInfo;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
if (v % 5 <= 1) {
return NULL;
} else {
return malloc(__size);
}
}
#define malloc u_malloc
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
......@@ -1509,7 +1525,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
static bool isQueryKilled(SQInfo *pQInfo) {
return false;
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -2586,7 +2601,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
// int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
......@@ -2604,15 +2618,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
if (pResultInfo->numOfRes > 0) {
return pResultInfo->numOfRes;
}
// if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) {
// maxOutput = pResultInfo->numOfRes;
//
// if (maxOutput > 0) {
// break;
// }
// }
//
// assert(pResultInfo != NULL);
}
return 0;
......@@ -2623,12 +2628,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SQuery * pQuery = pRuntimeEnv->pQuery;
size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = pQuery->sdata;
int32_t * posList = calloc(size, sizeof(int32_t));
int32_t* posList = calloc(size, sizeof(int32_t));
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
if (pTableList == NULL || posList == NULL) {
tfree(posList);
tfree(pTableList);
qError("QInfo:%p failed alloc memory", pQInfo);
longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// todo opt for the case of one table per group
int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) {
......@@ -4069,7 +4081,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
return pFillCol;
}
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) {
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
int32_t code = TSDB_CODE_SUCCESS;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -4085,12 +4097,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
pQInfo->vgId = vgId;
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->pTSBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
pRuntimeEnv->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
if (param != NULL) {
if (pTsBuf != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
}
......@@ -4331,7 +4343,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
taosArrayDestroy(s);
// here we simply set the first table as current table
pQuery->current = (STableQueryInfo*) GET_TABLEGROUP(pQInfo, 0);
SArray* first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
pQuery->current = taosArrayGetP(first, 0);
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
......@@ -4930,14 +4944,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
/* check if query is killed or not */
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
}
static void stableQueryImpl(SQInfo *pQInfo) {
......@@ -4959,10 +4965,6 @@ static void stableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time
pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st);
if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
}
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
......@@ -5074,6 +5076,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr,
char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
int32_t code = TSDB_CODE_SUCCESS;
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
......@@ -5100,7 +5104,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
// query msg safety check
if (!validateQueryMsg(pQueryMsg)) {
return TSDB_CODE_QRY_INVALID_MSG;
code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
}
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
......@@ -5172,7 +5177,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
int16_t functionId = pExprMsg->functionId;
if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) {
if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression.
return TSDB_CODE_QRY_INVALID_MSG;
code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
}
} else {
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
......@@ -5184,6 +5190,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
}
......@@ -5191,6 +5198,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
*groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex));
if (*groupbyCols == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
(*groupbyCols)[i].colId = *(int16_t *)pMsg;
......@@ -5246,7 +5257,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (*pMsg != 0) {
size_t len = strlen(pMsg) + 1;
*tbnameCond = malloc(len);
if (*tbnameCond == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
strcpy(*tbnameCond, pMsg);
pMsg += len;
}
......@@ -5256,7 +5273,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
return 0;
return TSDB_CODE_SUCCESS;
_cleanup:
tfree(*pExpr);
......@@ -5266,7 +5284,8 @@ _cleanup:
tfree(*groupbyCols);
tfree(*tagCols);
tfree(*tagCond);
return TSDB_CODE_QRY_INVALID_MSG;
return code;
}
static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
......@@ -5654,7 +5673,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window);
item->groupIndex = i;
item->tableIndex = tableIndex++;
taosArrayPush(p1, &item);
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
}
......@@ -5668,7 +5686,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->window = pQueryMsg->window;
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno));
int32_t code = TAOS_SYSTEM_ERROR(errno);
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, tstrerror(code));
goto _cleanup;
}
......@@ -5679,7 +5698,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
_cleanup:
freeQInfo(pQInfo);
return NULL;
}
......@@ -5697,8 +5715,7 @@ static bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo);
}
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) {
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param, _qinfo_free_fn_t fn) {
int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -5722,6 +5739,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
return TSDB_CODE_SUCCESS;
}
pQInfo->param = param;
pQInfo->freeFn = fn;
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
setQueryStatus(pQuery, QUERY_COMPLETED);
......@@ -5785,7 +5805,7 @@ static void freeQInfo(SQInfo *pQInfo) {
// todo refactor, extract method to destroytableDataInfo
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = GET_TABLEGROUP(pQInfo, i);;
SArray *p = GET_TABLEGROUP(pQInfo, i);
size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) {
......@@ -5894,8 +5914,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
return TSDB_CODE_SUCCESS;
}
int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) {
assert(pQueryMsg != NULL);
typedef struct SQueryMgmt {
SCacheObj *qinfoPool; // query handle pool
int32_t vgId;
bool closed;
pthread_mutex_t lock;
} SQueryMgmt;
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn,
qinfo_t* pQInfo) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -5984,7 +6012,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
goto _over;
}
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param, fn);
_over:
free(tagCond);
......@@ -6020,7 +6048,7 @@ static void doDestoryQueryInfo(SQInfo* pQInfo) {
freeQInfo(pQInfo);
}
void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) {
void qDestroyQueryInfo(qinfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) {
return;
......@@ -6030,15 +6058,19 @@ void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) {
qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
_qinfo_free_fn_t freeFp = pQInfo->freeFn;
void* param = pQInfo->param;
if (fp != NULL) {
fp(param);
doDestoryQueryInfo(pQInfo);
if (freeFp != NULL) {
assert(param != NULL);
freeFp(param);
}
}
}
void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
void qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
......@@ -6048,17 +6080,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo);
qDestroyQueryInfo(pQInfo, fp, param);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
int32_t ret = setjmp(pQInfo->env);
// error occurs, record the error code and return to client
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
qDebug("QInfo:%p query task is launched", pQInfo);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
buildTagQueryResult(pQInfo); // todo support the limit/offset
......@@ -6068,8 +6117,18 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
tableQueryImpl(pQInfo);
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo, fp, param);
qDestroyQueryInfo(pQInfo);
}
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
......@@ -6162,7 +6221,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return code;
}
int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
int32_t qKillQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
......@@ -6170,8 +6229,7 @@ int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
}
setQueryKilled(pQInfo);
qDestroyQueryInfo(pQInfo, fp, param);
qDestroyQueryInfo(pQInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -6309,3 +6367,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillQuery(*handle);
}
void* qOpenQueryMgmt(int32_t vgId) {
const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, refresh handle pool
char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId);
SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryHandle->closed = false;
pthread_mutex_init(&pQueryHandle->lock, NULL);
qDebug("vgId:%d, open querymgmt success", vgId);
return pQueryHandle;
}
void qSetQueryMgmtClosed(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt* pQueryMgmt = pQMgmt;
qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheEmpty(pQueryMgmt->qinfoPool, true);
}
void qCleanupQueryMgmt(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt* pQueryMgmt = pQMgmt;
int32_t vgId = pQueryMgmt->vgId;
assert(pQueryMgmt->closed);
SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
pQueryMgmt->qinfoPool = NULL;
taosCacheCleanup(pqinfoPool);
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
qDebug("vgId:%d querymgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, void* qInfo) {
if (pMgmt == NULL) {
return NULL;
}
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock);
return NULL;
} else {
void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2);
pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireQInfo(void* pMgmt, void** key) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES);
if (handle == NULL || *handle == NULL) {
return NULL;
} else {
return handle;
}
}
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, needFree);
return 0;
}
......@@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
size_t len = strlen(cond) + VARSTR_HEADER_SIZE;
char* p = exception_malloc(len);
varDataSetLen(p, len - VARSTR_HEADER_SIZE);
memcpy(varDataVal(p), cond, len);
STR_WITH_SIZE_TO_VARSTR(p, cond, len - VARSTR_HEADER_SIZE);
taosArrayPush(pVal->arr, &p);
}
......
......@@ -15,16 +15,16 @@
#include "os.h"
#include "qsqlparser.h"
#include "queryLog.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tcmdtype.h"
#include "tglobal.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "ttokendef.h"
#include "tutil.h"
#include "qsqltype.h"
#include "tstrbuild.h"
#include "queryLog.h"
SSqlInfo qSQLParse(const char *pStr) {
void *pParser = ParseAlloc(malloc);
......
此差异已折叠。
此差异已折叠。
......@@ -28,6 +28,7 @@
#include "tsdbMain.h"
#include "tutil.h"
#include "ttime.h"
#include "tfile.h"
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"};
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册