提交 f87b68d9 编写于 作者: S slguan

Merge branch 'develop' into feature/balance

...@@ -41,12 +41,29 @@ addons: ...@@ -41,12 +41,29 @@ addons:
branch_pattern: coverity_scan branch_pattern: coverity_scan
before_script: before_script:
- mkdir build - mkdir debug
- cd build - cd debug
script: script:
- cmake .. - cmake ..
- cmake --build . - cmake --build . || exit $?
- |-
case $TRAVIS_OS_NAME in
linux)
cd ../tests/script
sudo ./test.sh 2>&1 | tee out.txt
cat out.txt
grep success out.txt
total_success=`grep success out.txt | wc -l`
echo "Total $total_success success"
grep failed out.txt
total_failed=`grep failed out.txt | wc -l`
echo "Total $total_failed failed"
if [ "$total_failed" -ne "0" ]; then
exit $total_failed
fi
;;
esac
# #
# Build Matrix # Build Matrix
...@@ -58,6 +75,7 @@ matrix: ...@@ -58,6 +75,7 @@ matrix:
packages: packages:
- build-essential - build-essential
- cmake - cmake
- net-tools
# - os: osx # - os: osx
# addons: # addons:
......
...@@ -351,7 +351,7 @@ typedef struct SSqlObj { ...@@ -351,7 +351,7 @@ typedef struct SSqlObj {
char * sqlstr; char * sqlstr;
char retry; char retry;
char maxRetry; char maxRetry;
SRpcIpSet *ipList; SRpcIpSet ipList;
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
......
...@@ -342,8 +342,8 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { ...@@ -342,8 +342,8 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
(*pSql->fp)(pSql->param, taosres, code); (*pSql->fp)(pSql->param, taosres, code);
if (shouldFree) { if (shouldFree) {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed in async res", pSql); tscTrace("%p Async sql is automatically freed in async res", pSql);
tscFreeSqlObj(pSql);
} }
} }
......
...@@ -292,7 +292,6 @@ void tscKillConnection(STscObj *pObj) { ...@@ -292,7 +292,6 @@ void tscKillConnection(STscObj *pObj) {
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
taos_close(pObj);
tscTrace("connection:%p is killed", pObj); tscTrace("connection:%p is killed", pObj);
taos_close(pObj);
} }
...@@ -209,7 +209,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -209,7 +209,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
// assert(pQueryInfo->numOfTables == 0);
STableMetaInfo* pTableMetaInfo = NULL; STableMetaInfo* pTableMetaInfo = NULL;
if (pQueryInfo->numOfTables == 0) { if (pQueryInfo->numOfTables == 0) {
......
...@@ -169,17 +169,27 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -169,17 +169,27 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
char *pMsg = rpcMallocCont(pSql->cmd.payloadLen); SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsDnodeShellPort; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pSql->ipList.numOfIps = pTableMeta->numOfVpeers;
pSql->ipList.port = tsDnodeShellPort;
pSql->ipList.inUse = 0;
for(int32_t i = 0; i < pTableMeta->numOfVpeers; ++i) {
pSql->ipList.ip[i] = pTableMeta->vpeerDesc[i].ip;
}
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -189,10 +199,12 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -189,10 +199,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
} else { } else {
pSql->ipList->port = tsMnodeShellPort; pSql->ipList = tscMgmtIpList;
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); pSql->ipList.port = tsMnodeShellPort;
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
...@@ -201,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -201,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg); rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -254,11 +266,15 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -254,11 +266,15 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} else { } else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, tstrerror(rpcMsg->code)); tscTrace("%p it shall renew table meta, code:%d", pSql, tstrerror(rpcMsg->code));
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
if (++pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, ", pSql, pSql->retry);
return;
}
rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name); rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name);
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
...@@ -343,8 +359,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -343,8 +359,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
(*pSql->fp)(pSql->param, taosres, rpcMsg->code); (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
if (shouldFree) { if (shouldFree) {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed", pSql); tscTrace("%p Async sql is automatically freed", pSql);
tscFreeSqlObj(pSql);
} }
} }
...@@ -405,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -405,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) {
} }
// temp // temp
pSql->ipList = &tscMgmtIpList; // pSql->ipList = tscMgmtIpList;
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
// pSql->index = pTableMetaInfo->pTableMeta->index; // pSql->index = pTableMetaInfo->pTableMeta->index;
// } else { // it must be the parent SSqlObj for super table query // } else { // it must be the parent SSqlObj for super table query
...@@ -417,7 +433,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -417,7 +433,7 @@ int tscProcessSql(SSqlObj *pSql) {
// } // }
// } // }
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) { } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
pSql->ipList = &tscMgmtIpList; pSql->ipList = tscMgmtIpList;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -532,9 +548,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -532,9 +548,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during copying data into paylaod // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htons(pMsgDesc->numOfVnodes)); tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htonl(pMsgDesc->numOfVnodes));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1787,6 +1803,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1787,6 +1803,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
size = tscEstimateHeartBeatMsgLength(pSql); size = tscEstimateHeartBeatMsgLength(pSql);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex);
tscError("%p failed to malloc for heartbeat msg", pSql); tscError("%p failed to malloc for heartbeat msg", pSql);
return -1; return -1;
} }
...@@ -1836,6 +1853,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1836,6 +1853,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId); pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip);
pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId);
} }
SSchema* pSchema = pMetaMsg->schema; SSchema* pSchema = pMetaMsg->schema;
...@@ -2435,7 +2454,7 @@ static void tscWaitingForCreateTable(SSqlCmd *pCmd) { ...@@ -2435,7 +2454,7 @@ static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) { int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
int code = 0; int code = 0;
// handle metric meta renew process // handle table meta renew process
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
......
...@@ -757,8 +757,8 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { ...@@ -757,8 +757,8 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (tscShouldFreeAsyncSqlObj(pSql)) { if (tscShouldFreeAsyncSqlObj(pSql)) {
tscFreeSqlObj(pSql);
tscTrace("%p Async SqlObj is freed by app", pSql); tscTrace("%p Async SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql);
} else { } else {
if (keepCmd) { if (keepCmd) {
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
......
...@@ -582,10 +582,12 @@ void taos_close_stream(TAOS_STREAM *handle) { ...@@ -582,10 +582,12 @@ void taos_close_stream(TAOS_STREAM *handle) {
tscRemoveFromStreamList(pStream, pSql); tscRemoveFromStreamList(pStream, pSql);
taosTmrStopA(&(pStream->pTimer)); taosTmrStopA(&(pStream->pTimer));
tscTrace("%p stream:%p is closed", pSql, pStream);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
pStream->pSql = NULL; pStream->pSql = NULL;
tscTrace("%p stream:%p is closed", pSql, pStream);
tfree(pStream); tfree(pStream);
} }
} }
...@@ -1381,6 +1381,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1381,6 +1381,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
} else { // all data has been retrieved to client } else { // all data has been retrieved to client
tscAllDataRetrievedFromDnode(trsupport, pSql); tscAllDataRetrievedFromDnode(trsupport, pSql);
} }
pthread_mutex_unlock(&trsupport->queryMutex);
} }
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
......
...@@ -860,12 +860,10 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { ...@@ -860,12 +860,10 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
pCmd->allocSize = size; pCmd->allocSize = size;
} }
memset(pCmd->payload, 0, pCmd->payloadLen); memset(pCmd->payload, 0, pCmd->allocSize);
} }
//memset(pCmd->payload, 0, pCmd->allocSize);
assert(pCmd->allocSize >= size); assert(pCmd->allocSize >= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -141,7 +141,7 @@ STSchema *tdDupSchema(STSchema *pSchema) { ...@@ -141,7 +141,7 @@ STSchema *tdDupSchema(STSchema *pSchema) {
* Free the SSchema object created by tdNewSchema or tdDupSchema * Free the SSchema object created by tdNewSchema or tdDupSchema
*/ */
void tdFreeSchema(STSchema *pSchema) { void tdFreeSchema(STSchema *pSchema) {
if (pSchema == NULL) free(pSchema); if (pSchema != NULL) free(pSchema);
} }
/** /**
......
...@@ -700,6 +700,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { ...@@ -700,6 +700,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
int32_t total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); int32_t total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)calloc(1, total_size); SRowHead *rowHead = (SRowHead *)calloc(1, total_size);
if (rowHead == NULL) { if (rowHead == NULL) {
pthread_mutex_unlock(&pTable->mutex);
sdbError("table:%s, failed to allocate row head memory", pTable->tableName); sdbError("table:%s, failed to allocate row head memory", pTable->tableName);
return -1; return -1;
} }
......
...@@ -1506,9 +1506,9 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -1506,9 +1506,9 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) { if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].publicIp);
} else { } else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].privateIp);
} }
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId); pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
......
...@@ -2479,12 +2479,11 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl ...@@ -2479,12 +2479,11 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
} }
if (r == BLK_DATA_NO_NEEDED) { if (r == BLK_DATA_NO_NEEDED) {
// qTrace("QInfo:%p vid:%d sid:%d id:%s, slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d",
// rows:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->size);
// pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
} else if (r == BLK_DATA_FILEDS_NEEDED) { } else if (r == BLK_DATA_FILEDS_NEEDED) {
if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
// return DISK_DATA_LOAD_FAILED; // return DISK_DATA_LOAD_FAILED;
} }
if (*pStatis == NULL) { if (*pStatis == NULL) {
...@@ -5908,14 +5907,6 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5908,14 +5907,6 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->sdata[col]); tfree(pQuery->sdata[col]);
} }
// for (int col = 0; col < pQuery->numOfCols; ++col) {
// vnodeFreeColumnInfo(&pQuery->colList[col].data);
// }
//
// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// tfree(pQuery->tsData);
// }
sem_destroy(&(pQInfo->dataReady)); sem_destroy(&(pQInfo->dataReady));
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
...@@ -6125,9 +6116,6 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -6125,9 +6116,6 @@ void qTableQuery(SQInfo *pQInfo) {
dTrace("QInfo:%p query task is launched", pQInfo); dTrace("QInfo:%p query task is launched", pQInfo);
// sem_post(&pQInfo->dataReady);
// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER;
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) { if (numOfTables == 1) {
singleTableQueryImpl(pQInfo); singleTableQueryImpl(pQInfo);
......
...@@ -331,6 +331,7 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -331,6 +331,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead); char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
if (contLen == 0 ) { if (contLen == 0 ) {
free(start); free(start);
return NULL;
} }
int size = contLen + RPC_MSG_OVERHEAD; int size = contLen + RPC_MSG_OVERHEAD;
...@@ -1096,6 +1097,10 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1096,6 +1097,10 @@ static void rpcProcessConnError(void *param, void *id) {
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
if (pRpc == NULL) {
return;
}
tTrace("%s connection error happens", pRpc->label); tTrace("%s connection error happens", pRpc->label);
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
......
...@@ -35,7 +35,7 @@ static int vnodeWALCallback(void *arg); ...@@ -35,7 +35,7 @@ static int vnodeWALCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int tsOpennedVnodes; static int32_t tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() { static void vnodeInit() {
...@@ -103,8 +103,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -103,8 +103,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
} }
int32_t vnodeDrop(int32_t vgId) { int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = *(SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dTrace("vgId:%d, failed to drop, vgId not exist", vgId); dTrace("vgId:%d, failed to drop, vgId not exist", vgId);
return TSDB_CODE_INVALID_VGROUP_ID; return TSDB_CODE_INVALID_VGROUP_ID;
...@@ -310,7 +309,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -310,7 +309,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
// TODO: this is a simple implement // TODO: this is a simple implement
static int32_t vnodeReadCfg(SVnodeObj *pVnode) { static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
char option[3][16] = {0}; char option[5][16] = {0};
char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId); sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "dataformat.h" #include "dataformat.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tutil.h"
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
...@@ -158,6 +159,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -158,6 +159,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
void *pTsdb = vnodeGetTsdb(pVnode); void *pTsdb = vnodeGetTsdb(pVnode);
code = tsdbCreateTable(pTsdb, &tCfg); code = tsdbCreateTable(pTsdb, &tCfg);
tfree(pDestSchema);
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
return code; return code;
} }
......
...@@ -62,6 +62,7 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { ...@@ -62,6 +62,7 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
// TODO // TODO
} }
} }
closedir(dir);
return pFileH; return pFileH;
} }
......
...@@ -268,6 +268,9 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) { ...@@ -268,6 +268,9 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) {
tsdbFreeCache(pRepo->tsdbCache); tsdbFreeCache(pRepo->tsdbCache);
tfree(pRepo->rootDir);
tfree(pRepo);
return 0; return 0;
} }
...@@ -847,6 +850,7 @@ static void *tsdbCommitData(void *arg) { ...@@ -847,6 +850,7 @@ static void *tsdbCommitData(void *arg) {
tsdbLockRepo(arg); tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool); tdListMove(pCache->imem->list, pCache->pool.memPool);
tdListFree(pCache->imem->list);
free(pCache->imem); free(pCache->imem);
pCache->imem = NULL; pCache->imem = NULL;
pRepo->commit = 0; pRepo->commit = 0;
...@@ -1125,11 +1129,11 @@ static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsTo ...@@ -1125,11 +1129,11 @@ static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsTo
*len += pCompCol->len; *len += pCompCol->len;
} }
if (pCompData == NULL) free((void *)pCompData); tfree(pCompData);
return 0; return 0;
_err: _err:
if (pCompData == NULL) free((void *)pCompData); tfree(pCompData);
return -1; return -1;
} }
......
...@@ -312,6 +312,14 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { ...@@ -312,6 +312,14 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
// return 0; // return 0;
// } // }
static void tsdbFreeMemTable(SMemTable *pMemTable) {
if (pMemTable) {
tSkipListDestroy(pMemTable->pData);
}
free(pMemTable);
}
static int tsdbFreeTable(STable *pTable) { static int tsdbFreeTable(STable *pTable) {
// TODO: finish this function // TODO: finish this function
if (pTable->type == TSDB_CHILD_TABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
...@@ -323,7 +331,10 @@ static int tsdbFreeTable(STable *pTable) { ...@@ -323,7 +331,10 @@ static int tsdbFreeTable(STable *pTable) {
// Free content // Free content
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) { if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
tSkipListDestroy(pTable->pIndex); tSkipListDestroy(pTable->pIndex);
} }
tsdbFreeMemTable(pTable->mem);
tsdbFreeMemTable(pTable->imem);
free(pTable); free(pTable);
return 0; return 0;
......
...@@ -177,6 +177,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh) { ...@@ -177,6 +177,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh) {
close(mfh->fd); close(mfh->fd);
taosHashCleanup(mfh->map); taosHashCleanup(mfh->map);
tfree(mfh);
} }
static int32_t tsdbGetMetaFileName(char *rootDir, char *fname) { static int32_t tsdbGetMetaFileName(char *rootDir, char *fname) {
......
...@@ -37,10 +37,6 @@ typedef struct SField { ...@@ -37,10 +37,6 @@ typedef struct SField {
// todo need the definition // todo need the definition
} SField; } SField;
typedef struct SHeaderFileInfo {
int32_t fileId;
} SHeaderFileInfo;
typedef struct SQueryFilePos { typedef struct SQueryFilePos {
int32_t fid; int32_t fid;
int32_t slot; int32_t slot;
...@@ -380,11 +376,12 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { ...@@ -380,11 +376,12 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
SArray *sa = getDefaultLoadColumns(pQueryHandle, true); SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
// query ended in current block // query ended in current block
if (pQueryHandle->window.ekey < pBlock->keyLast) { if (pQueryHandle->window.ekey < pBlock->keyLast) {
doLoadDataFromFileBlock(pQueryHandle); doLoadDataFromFileBlock(pQueryHandle);
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
} else { // the whole block is loaded in to buffer
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
} }
} else {// todo desc query } else {// todo desc query
if (pQueryHandle->window.ekey > pBlock->keyFirst) { if (pQueryHandle->window.ekey > pBlock->keyFirst) {
...@@ -932,10 +929,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList ...@@ -932,10 +929,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
if (pHandle->cur.fid < 0) { if (pHandle->cur.fid < 0) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
......
...@@ -80,7 +80,8 @@ void *walOpen(char *path, int max, int level) { ...@@ -80,7 +80,8 @@ void *walOpen(char *path, int max, int level) {
} }
void walClose(void *handle) { void walClose(void *handle) {
if (handle == NULL) return;
SWal *pWal = (SWal *)handle; SWal *pWal = (SWal *)handle;
close(pWal->fd); close(pWal->fd);
......
################################# #################################
#run general/table/basic1.sim run general/table/basic1.sim
run general/table/basic2.sim #run general/table/basic2.sim
#run general/table/basic3.sim #run general/table/basic3.sim
################################## ##################################
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system ifconfig
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sql connect sql connect
......
...@@ -6,8 +6,8 @@ if [ -n "$PID" ]; then ...@@ -6,8 +6,8 @@ if [ -n "$PID" ]; then
sudo systemctl stop taosd sudo systemctl stop taosd
fi fi
PID=`ps -ef|grep taosd | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
if [ -n "$PID" ]; then if [ -n "$PID" ]; then
echo sudo kill -9 $PID echo sudo kill -9 $PID
sudo kill -9 $PID sudo pkill taosd
fi fi
...@@ -576,6 +576,7 @@ bool simCreateRestFulConnect(SScript *script, char *user, char *pass) { ...@@ -576,6 +576,7 @@ bool simCreateRestFulConnect(SScript *script, char *user, char *pass) {
bool simCreateNativeConnect(SScript *script, char *user, char *pass) { bool simCreateNativeConnect(SScript *script, char *user, char *pass) {
simCloseTaosdConnect(script); simCloseTaosdConnect(script);
void *taos = NULL; void *taos = NULL;
taosMsleep(2000);
for (int attempt = 0; attempt < 10; ++attempt) { for (int attempt = 0; attempt < 10; ++attempt) {
taos = taos_connect(NULL, user, pass, NULL, tsMnodeShellPort); taos = taos_connect(NULL, user, pass, NULL, tsMnodeShellPort);
if (taos == NULL) { if (taos == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册