提交 915677cd 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/update

......@@ -75,6 +75,7 @@ typedef struct SJoinSupporter {
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
......@@ -225,7 +226,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
......@@ -265,6 +266,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
......
......@@ -77,7 +77,7 @@ SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
* @param colId
* @return
*/
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
/**
* check if the schema is valid or not, including following aspects:
......
......@@ -128,7 +128,7 @@ typedef struct STableMetaInfo {
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
int64_t uid; // refactor use the pointer
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
......
......@@ -405,7 +405,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlRes *pRes = &pSql->res;
pRes->code = code;
const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
SSqlObj *sub = (SSqlObj*) res;
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
if (code != TSDB_CODE_SUCCESS) {
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
goto _error;
......
......@@ -1253,10 +1253,11 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
return true;
}
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
pLocalReducer->pCtx[i].aOutputBuf =
pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity;
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// reset output buffer to the beginning
size_t t = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < t; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity;
}
memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
......@@ -1501,8 +1502,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
if (pLocalReducer->discard && sameGroup) {
pLocalReducer->hasUnprocessedRow = false;
tmpBuffer->num = 0;
} else {
// current row does not belongs to the previous group, so it is not be handled yet.
} else { // current row does not belongs to the previous group, so it is not be handled yet.
pLocalReducer->hasUnprocessedRow = true;
}
......
......@@ -2728,7 +2728,6 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* pCmd) {
const char* msg1 = "too many columns in group by clause";
const char* msg2 = "invalid column name in group by clause";
// const char* msg3 = "group by columns must belong to one table";
const char* msg7 = "not support group by expression";
const char* msg8 = "not allowed column type for group by";
const char* msg9 = "tags not allowed for table query";
......@@ -2803,7 +2802,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
} else {
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
if (pSchema->type > TSDB_DATA_TYPE_BINARY) {
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
}
......@@ -5282,20 +5281,26 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex);
SSqlExpr* pExpr = NULL;
size_t size = taosArrayGetSize(pQueryInfo->exprList);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, (int32_t)size - 1);
if (size > 0) {
pExpr = tscSqlExprGet(pQueryInfo, (int32_t)size - 1);
}
if (pExpr->functionId != TSDB_FUNC_TAG) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
int16_t columnInfo = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SColumnIndex index = {.tableIndex = 0, .columnIndex = columnInfo};
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
if (pExpr == NULL || pExpr->functionId != TSDB_FUNC_TAG) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pParentQueryInfo, tableIndex);
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SSchema* pTagSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, colId);
int16_t colIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, colId);
SColumnIndex index = {.tableIndex = 0, .columnIndex = colIndex};
char* name = pTagSchema->name;
int16_t type = pTagSchema->type;
int16_t bytes = pTagSchema->bytes;
int16_t type = pSchema[index.columnIndex].type;
int16_t bytes = pSchema[index.columnIndex].bytes;
char* name = pSchema[index.columnIndex].name;
pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, bytes, true);
pExpr->colInfo.flag = TSDB_COL_TAG;
......
......@@ -118,7 +118,7 @@ SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex)
}
// TODO for large number of columns, employ the binary search method
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) {
......
......@@ -150,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pObj == NULL) return;
if (pObj != pObj->signature) {
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
return;
}
......@@ -175,12 +175,12 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
}
} else {
tscDebug("heartbeat failed, code:%s", tstrerror(code));
tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code));
}
if (pObj->pHb != NULL) {
int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
} else {
......@@ -208,6 +208,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
assert(*pHB->self == pHB);
pHB->retry = 0;
int32_t code = tscProcessSql(pHB);
taosCacheRelease(tscObjCache, (void**) &p, false);
......@@ -552,11 +553,28 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + 4096;
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
int32_t tableSerialize = 0;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo->pVgroupTables != NULL) {
size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
int32_t totalTables = 0;
for (int32_t i = 0; i < numOfGroups; ++i) {
SVgroupTableInfo *pTableInfo = taosArrayGet(pTableMetaInfo->pVgroupTables, i);
totalTables += (int32_t) taosArrayGetSize(pTableInfo->itemList);
}
tableSerialize = totalTables * sizeof(STableIdInfo);
}
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
tableSerialize + 4096;
}
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
......@@ -1641,11 +1659,14 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex);
tscError("%p failed to malloc for heartbeat msg", pSql);
tscError("%p failed to create heartbeat msg", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// TODO the expired hb and client can not be identified by server till now.
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));
pHeartbeat->numOfQueries = numOfQueries;
pHeartbeat->numOfStreams = numOfStreams;
......@@ -1998,10 +2019,11 @@ static void createHBObj(STscObj* pObj) {
}
int tscProcessConnectRsp(SSqlObj *pSql) {
char temp[TSDB_TABLE_FNAME_LEN * 2];
STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res;
char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
......@@ -2020,6 +2042,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->connId = htonl(pConnect->connId);
createHBObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0;
......
......@@ -320,11 +320,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->colList = pSupporter->colList;
pQueryInfo->exprList = pSupporter->exprList;
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
pQueryInfo->groupbyExpr = pSupporter->groupInfo;
pSupporter->exprList = NULL;
pSupporter->colList = NULL;
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
......@@ -332,7 +329,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
pSupporter->exprList = NULL;
pSupporter->colList = NULL;
pSupporter->pVgroupTables = NULL;
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr));
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
......@@ -460,18 +462,36 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
}
int32_t tscCompareTidTags(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
int32_t tidTagsCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) (p1);
const STidTags* t2 = (const STidTags*) (p2);
if (t1->vgId != t2->vgId) {
return (t1->vgId > t2->vgId) ? 1 : -1;
}
if (t1->tid != t2->tid) {
return (t1->tid > t2->tid) ? 1 : -1;
tstr* tag1 = (tstr*) t1->tag;
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return 0;
return strncmp(tag1->data, tag2->data, tag1->len);
}
int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
tstr* tag1 = (tstr*) t1->tag;
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return strncmp(tag1->data, tag2->data, tag1->len);
}
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
......@@ -587,17 +607,19 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
qsort(p1->pIdTagList, p1->num, p1->tagSize, tscCompareTidTags);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tscCompareTidTags);
// sort according to the tag value
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SSchema* pColSchema = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
// int16_t for padding
*s1 = taosArrayInit(p1->num, p1->tagSize - sizeof(int16_t));
*s2 = taosArrayInit(p2->num, p2->tagSize - sizeof(int16_t));
int32_t size = p1->tagSize - sizeof(int16_t);
*s1 = taosArrayInit(p1->num, size);
*s2 = taosArrayInit(p2->num, size);
if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
return TSDB_CODE_QRY_DUP_JOIN_KEY;
......@@ -625,6 +647,14 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
}
}
// reorganize the tid-tag value according to both the vgroup id and tag values
// sort according to the tag value
size_t t1 = taosArrayGetSize(*s1);
size_t t2 = taosArrayGetSize(*s2);
qsort((*s1)->pData, t1, size, tidTagsCompar);
qsort((*s2)->pData, t2, size, tidTagsCompar);
return TSDB_CODE_SUCCESS;
}
......@@ -744,10 +774,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
SSqlObj* psub1 = pParentSql->pSubs[0];
((SJoinSupporter*)psub1->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo1->pVgroupTables);
((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo1->pVgroupTables);
SSqlObj* psub2 = pParentSql->pSubs[1];
((SJoinSupporter*)psub2->param)->pVgroupTables = tscCloneVgroupTableInfo(pTableMetaInfo2->pVgroupTables);
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables);
pParentSql->subState.numOfSub = 2;
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
......@@ -1313,6 +1343,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
pNew->cmd.numOfCols = 0;
pNewQueryInfo->interval.interval = 0;
pSupporter->limit = pNewQueryInfo->limit;
......@@ -1333,17 +1366,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
assert(pTagCond->joinInfo.hasJoin);
int32_t tagColId = tscGetJoinTagColIdByUid(pTagCond, pTableMetaInfo->pTableMeta->id.uid);
SSchema* s = tscGetTableColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
// get the tag colId column index
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
for(int32_t i = 0; i < numOfTags; ++i) {
if (pSchema[i].colId == tagColId) {
colIndex.columnIndex = i;
break;
}
}
SSchema* s = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
int16_t bytes = 0;
int16_t type = 0;
......@@ -2165,7 +2190,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
numOfRes = (int32_t)(MIN(numOfRes, remain));
}
if (numOfRes == 0) {
if (numOfRes == 0) { // no result any more, free all subquery objects
freeJoinSubqueryObj(pSql);
return;
}
......
......@@ -80,7 +80,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon
void taos_init_imp(void) {
char temp[128];
char temp[128] = {0};
errno = TSDB_CODE_SUCCESS;
srand(taosGetTimestampSec());
......@@ -149,30 +149,42 @@ void taos_init_imp(void) {
tscRefId = taosOpenRef(200, tscCloseTscObj);
// in other language APIs, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated
// resource to suppress the valgrind warning.
atexit(taos_cleanup);
tscDebug("client is initialized successfully");
}
void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() {
if (tscMetaCache != NULL) {
taosCacheCleanup(tscMetaCache);
tscMetaCache = NULL;
// this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) {
tscDebug("start to cleanup client environment");
taosCacheCleanup(tscObjCache);
tscObjCache = NULL;
void* m = tscMetaCache;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscMetaCache, m, 0) == m) {
taosCacheCleanup(m);
}
if (tscQhandle != NULL) {
taosCleanUpScheduler(tscQhandle);
tscQhandle = NULL;
m = tscObjCache;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) {
taosCacheCleanup(m);
}
m = tscQhandle;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) {
taosCleanUpScheduler(m);
}
taosCloseRef(tscRefId);
taosCleanupKeywordsTable();
taosCloseLog();
taosTmrCleanUp(tscTmr);
m = tscTmr;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
taosTmrCleanUp(m);
}
}
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
......
......@@ -1665,6 +1665,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
pQueryInfo->groupbyExpr.columnInfo = NULL;
pQueryInfo->groupbyExpr.numOfGroupCols = 0;
}
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
......@@ -1713,7 +1714,7 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
taosArrayRemove(pVgroupTable, index);
}
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return NULL;
}
......@@ -1739,7 +1740,7 @@ SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
tscDebug("%p unref %d tables in the tableMeta cache", address, pQueryInfo->numOfTables);
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
......@@ -1779,6 +1780,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList);
}
// TODO handle malloc failure
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
if (pTableMetaInfo->tagColList == NULL) {
return NULL;
......@@ -1788,7 +1790,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
}
pTableMetaInfo->pVgroupTables = tscCloneVgroupTableInfo(pVgroupTables);
pTableMetaInfo->pVgroupTables = tscVgroupTableInfoClone(pVgroupTables);
pQueryInfo->numOfTables += 1;
return pTableMetaInfo;
......@@ -2155,6 +2157,19 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
}
}
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) {
int32_t numOfTags = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
for(int32_t i = 0; i < numOfTags; ++i) {
if (pSchema[i].colId == colId) {
return i;
}
}
return -1;
}
bool tscIsUpdateQuery(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
......@@ -2469,6 +2484,7 @@ void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
for(int32_t i = 0; i < dst->numOfEps; ++i) {
taosTFree(dst->epAddr[i].fqdn);
dst->epAddr[i].port = src->epAddr[i].port;
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
}
......
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
......@@ -64,7 +64,7 @@ typedef struct taosField {
#endif
DLL_EXPORT void taos_init();
DLL_EXPORT void taos_cleanup();
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
......
......@@ -788,6 +788,7 @@ typedef struct {
} SStreamDesc;
typedef struct {
char clientVer[TSDB_VERSION_LEN];
uint32_t connId;
int32_t pid;
int32_t numOfQueries;
......
......@@ -232,12 +232,16 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
if (pHBRsp == NULL) {
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
if (pRsp == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code
}
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
......@@ -251,33 +255,33 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
if (pConn == NULL) {
// do not close existing links, otherwise
// mError("failed to create connId, close connect");
// pHBRsp->killConnection = 1;
// pRsp->killConnection = 1;
} else {
pHBRsp->connId = htonl(pConn->connId);
pRsp->connId = htonl(pConn->connId);
mnodeSaveQueryStreamList(pConn, pHBMsg);
if (pConn->killed != 0) {
pHBRsp->killConnection = 1;
pRsp->killConnection = 1;
}
if (pConn->streamId != 0) {
pHBRsp->streamId = htonl(pConn->streamId);
pRsp->streamId = htonl(pConn->streamId);
pConn->streamId = 0;
}
if (pConn->queryId != 0) {
pHBRsp->queryId = htonl(pConn->queryId);
pRsp->queryId = htonl(pConn->queryId);
pConn->queryId = 0;
}
}
pHBRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeEpSetForShell(&pHBRsp->epSet);
pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
pRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeEpSetForShell(&pRsp->epSet);
pMsg->rpcRsp.rsp = pHBRsp;
pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
mnodeReleaseConn(pConn);
return TSDB_CODE_SUCCESS;
}
......
......@@ -940,7 +940,6 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
if (sas->data == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1003,7 +1002,6 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1277,7 +1275,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1857,8 +1854,11 @@ static bool needReverseScan(SQuery *pQuery) {
}
if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) {
// the scan order to acquire the last result of the specified column
int32_t order = (int32_t)pQuery->pSelectExpr[i].base.arg->argValue.i64;
return order != pQuery->order.order;
if (order != pQuery->order.order) {
return true;
}
}
}
......@@ -3667,7 +3667,6 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
// check if query is killed or not
if (IS_QUERY_KILLED(pQInfo)) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
}
......@@ -4310,7 +4309,6 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) {
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5174,7 +5172,7 @@ static void doSaveContext(SQInfo *pQInfo) {
SWITCH_ORDER(pQuery->order.order);
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order);
}
STsdbQueryCond cond = {
......@@ -5267,7 +5265,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
// query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5289,7 +5286,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
// finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......@@ -5329,7 +5327,6 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
finalizeQueryResult(pRuntimeEnv);
if (IS_QUERY_KILLED(pQInfo)) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
......
......@@ -252,16 +252,16 @@ static const char isIdChar[] = {
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, /* 7x */
};
static void* KeywordHashTable = NULL;
static void* keywordHashTable = NULL;
static void doInitKeywordsTable(void) {
int numOfEntries = tListLen(keywordTable);
KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false);
keywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false);
for (int32_t i = 0; i < numOfEntries; i++) {
keywordTable[i].len = (uint8_t)strlen(keywordTable[i].name);
void* ptr = &keywordTable[i];
taosHashPut(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES);
taosHashPut(keywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES);
}
}
......@@ -283,7 +283,7 @@ int tSQLKeywordCode(const char* z, int n) {
}
}
SKeyword** pKey = (SKeyword**)taosHashGet(KeywordHashTable, key, n);
SKeyword** pKey = (SKeyword**)taosHashGet(keywordHashTable, key, n);
return (pKey != NULL)? (*pKey)->type:TK_ID;
}
......@@ -661,5 +661,8 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr, uint32_t numOfIgn
bool isKeyWord(const char* z, int32_t len) { return (tSQLKeywordCode((char*)z, len) != TK_ID); }
void taosCleanupKeywordsTable() {
taosHashCleanup(KeywordHashTable);
void* m = keywordHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr(&keywordHashTable, m, 0) == m) {
taosHashCleanup(m);
}
}
......@@ -20,6 +20,7 @@
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
#include "tref.h"
#include "tsocket.h"
#include "tglobal.h"
#include "taoserror.h"
......@@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN];
static ttpool_h tsTcpPool;
static void * syncTmrCtrl = NULL;
static void * vgIdHash;
static int tsSyncRefId = -1;
// local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
......@@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer);
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode);
static void syncAddNodeRef(SSyncNode *pNode);
static void syncDecNodeRef(SSyncNode *pNode);
static void syncFreeNode(void *);
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void syncMonitorFwdInfos(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
char* syncRole[] = {
......@@ -106,6 +108,12 @@ int32_t syncInit() {
return -1;
}
tsSyncRefId = taosOpenRef(200, syncFreeNode);
if (tsSyncRefId < 0) {
syncCleanUp();
return -1;
}
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
sInfo("sync module initialized successfully");
......@@ -128,6 +136,9 @@ void syncCleanUp() {
vgIdHash = NULL;
}
taosCloseRef(tsSyncRefId);
tsSyncRefId = -1;
sInfo("sync module is cleaned up");
}
......@@ -159,6 +170,12 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode->quorum = pCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
int ret = taosAddRef(tsSyncRefId, pNode);
if (ret < 0) {
syncFreeNode(pNode);
return NULL;
}
for (int i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
......@@ -167,8 +184,6 @@ void *syncStart(const SSyncInfo *pInfo) {
}
}
syncAddNodeRef(pNode);
if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG;
......@@ -210,7 +225,9 @@ void syncStop(void *param) {
SSyncNode *pNode = param;
SSyncPeer *pPeer;
if (pNode == NULL) return;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId);
pthread_mutex_lock(&(pNode->mutex));
......@@ -228,14 +245,17 @@ void syncStop(void *param) {
pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode);
taosReleaseRef(tsSyncRefId, pNode);
taosRemoveRef(tsSyncRefId, pNode);
}
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
SSyncNode *pNode = param;
int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica);
......@@ -298,105 +318,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
syncRole[nodeRole]);
syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, pNode);
return 0;
}
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
SSyncNode *pNode = param;
SSyncPeer *pPeer;
SSyncHead *pSyncHead;
SWalHead * pWalHead = data;
int fwdLen;
int code = 0;
if (pNode == NULL) return 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
pWalHead->version, nodeVersion);
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer);
}
return TSDB_CODE_SYN_INVALID_VERSION;
}
// always update version
nodeVersion = pWalHead->version;
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
qtype, pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return 0;
pthread_mutex_lock(&(pNode->mutex));
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) {
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
code = 1;
}
int retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) {
sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else {
sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
syncRestartConnection(pPeer);
}
}
pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode);
return code;
}
void syncConfirmForward(void *param, uint64_t version, int32_t code) {
SSyncNode *pNode = param;
if (pNode == NULL) return;
if (pNode->quorum <= 1) return;
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer == NULL) return;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) {
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
SSyncHead *pHead = (SSyncHead *)msg;
pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp);
SSyncHead *pHead = (SSyncHead *)msg;
pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp);
SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
pFwdRsp->version = version;
pFwdRsp->code = code;
SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
pFwdRsp->version = version;
pFwdRsp->code = code;
int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
int retLen = write(pPeer->peerFd, msg, msgLen);
int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
int retLen = write(pPeer->peerFd, msg, msgLen);
if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version);
} else {
sDebug("%s, failed to send forward ack, restart", pPeer->id);
syncRestartConnection(pPeer);
if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version);
} else {
sDebug("%s, failed to send forward ack, restart", pPeer->id);
syncRestartConnection(pPeer);
}
}
taosReleaseRef(tsSyncRefId, pNode);
}
void syncRecover(void *param) {
SSyncNode *pNode = param;
SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
// to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work
......@@ -414,17 +392,24 @@ void syncRecover(void *param) {
}
pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode);
}
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return -1;
pNodesRole->selfIndex = pNode->selfIndex;
for (int i = 0; i < pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
pNodesRole->role[i] = pNode->peerInfo[i]->role;
}
taosReleaseRef(tsSyncRefId, pNode);
return 0;
}
......@@ -457,22 +442,20 @@ static void syncAddArbitrator(SSyncNode *pNode) {
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
}
static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); }
static void syncFreeNode(void *param) {
SSyncNode *pNode = param;
static void syncDecNodeRef(SSyncNode *pNode) {
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) {
pthread_mutex_destroy(&pNode->mutex);
taosTFree(pNode->pRecv);
taosTFree(pNode->pSyncFwds);
taosTFree(pNode);
}
pthread_mutex_destroy(&pNode->mutex);
taosTFree(pNode->pRecv);
taosTFree(pNode->pSyncFwds);
taosTFree(pNode);
}
void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); }
int syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
syncDecNodeRef(pPeer->pSyncNode);
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode);
sDebug("%s, resource is freed", pPeer->id);
taosTFree(pPeer->watchFd);
......@@ -529,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
}
syncAddNodeRef(pNode);
taosAcquireRef(tsSyncRefId, pNode);
return pPeer;
}
......@@ -1122,7 +1105,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode;
syncAddNodeRef(pNode);
if (taosAcquireRef(tsSyncRefId, pNode) < 0) return;
pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
......@@ -1133,7 +1116,7 @@ static void syncProcessBrokenLink(void *param) {
}
pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode);
taosReleaseRef(tsSyncRefId, pNode);
}
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
......@@ -1202,22 +1185,90 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if ( ret < 0) return;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
if (pSyncFwds == NULL) return;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds) {;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break;
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break;
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
syncRemoveConfirmedFwdInfo(pNode);
pthread_mutex_unlock(&(pNode->mutex));
}
syncRemoveConfirmedFwdInfo(pNode);
pthread_mutex_unlock(&(pNode->mutex));
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
}
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
taosReleaseRef(tsSyncRefId, pNode);
}
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) {
SSyncPeer *pPeer;
SSyncHead *pSyncHead;
SWalHead * pWalHead = data;
int fwdLen;
int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
pWalHead->version, nodeVersion);
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer);
}
return TSDB_CODE_SYN_INVALID_VERSION;
}
// always update version
nodeVersion = pWalHead->version;
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
qtype, pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) {
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
code = 1;
}
int retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) {
sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else {
sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
syncRestartConnection(pPeer);
}
}
pthread_mutex_unlock(&(pNode->mutex));
return code;
}
......@@ -563,12 +563,12 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
void tsdbRefTable(STable *pTable) {
int32_t ref = T_REF_INC(pTable);
UNUSED(ref);
// tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
tsdbDebug("ref table %s uid %" PRIu64 " tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref);
}
void tsdbUnRefTable(STable *pTable) {
int32_t ref = T_REF_DEC(pTable);
tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
tsdbDebug("unref table %s uid:%"PRIu64" tid:%d, refCount:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), ref);
if (ref == 0) {
// tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
......@@ -890,7 +890,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
}
if (lock) tsdbUnlockRepoMeta(pRepo);
tsdbDebug("vgId:%d table %s is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable));
tsdbUnRefTable(pTable);
}
......
......@@ -2150,7 +2150,16 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
}
}
// clear current group
// clear current group, unref unused table
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
// keyInfo.pTable may be NULL here.
if (pKeyInfo->pTable != keyInfo.pTable) {
tsdbUnRefTable(pKeyInfo->pTable);
}
}
taosArrayClear(pGroup);
// more than one table in each group, only one table left for each group
......
......@@ -21,15 +21,42 @@
extern "C" {
#endif
int taosOpenRef(int max, void (*fp)(void *)); // return refId which will be used by other APIs
// open an instance, return refId which will be used by other APIs
int taosOpenRef(int max, void (*fp)(void *));
// close the Ref instance
void taosCloseRef(int refId);
int taosListRef(); // return the number of references in system
// add ref, p is the pointer to resource or pointer ID
int taosAddRef(int refId, void *p);
#define taosRemoveRef taosReleaseRef
// acquire ref, p is the pointer to resource or pointer ID
int taosAcquireRef(int refId, void *p);
// release ref, p is the pointer to resource or pinter ID
void taosReleaseRef(int refId, void *p);
#define taosRemoveRef taosReleaseRef
// return the first if p is null, otherwise return the next after p
void *taosIterateRef(int refId, void *p);
// return the number of references in system
int taosListRef();
/* sample code to iterate the refs
void demoIterateRefs(int refId) {
void *p = taosIterateRef(refId, NULL);
while (p) {
// process P
p = taosIterateRef(refId, p);
}
}
*/
#ifdef __cplusplus
}
......
......@@ -143,8 +143,6 @@ int taosAddRef(int refId, void *p)
return TSDB_CODE_REF_INVALID_ID;
}
uTrace("refId:%d p:%p try to add", refId, p);
pSet = tsRefSetList + refId;
taosIncRefCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
......@@ -203,8 +201,6 @@ int taosAcquireRef(int refId, void *p)
return TSDB_CODE_REF_INVALID_ID;
}
uTrace("refId:%d p:%p try to acquire", refId, p);
pSet = tsRefSetList + refId;
taosIncRefCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
......@@ -254,8 +250,6 @@ void taosReleaseRef(int refId, void *p)
return;
}
uTrace("refId:%d p:%p try to release", refId, p);
pSet = tsRefSetList + refId;
if (pSet->state == TSDB_REF_STATE_EMPTY) {
uTrace("refId:%d p:%p failed to release, cleaned", refId, p);
......@@ -305,6 +299,75 @@ void taosReleaseRef(int refId, void *p)
if (released) taosDecRefCount(pSet);
}
// if p is NULL, return the first p in hash list, otherwise, return the next after p
void *taosIterateRef(int refId, void *p) {
SRefNode *pNode = NULL;
SRefSet *pSet;
if (refId < 0 || refId >= TSDB_REF_OBJECTS) {
uTrace("refId:%d p:%p failed to iterate, refId not valid", refId, p);
return NULL;
}
pSet = tsRefSetList + refId;
taosIncRefCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) {
uTrace("refId:%d p:%p failed to iterate, not active", refId, p);
taosDecRefCount(pSet);
return NULL;
}
int hash = 0;
if (p) {
hash = taosHashRef(pSet, p);
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->p == p) break;
pNode = pNode->next;
}
if (pNode == NULL) {
uError("refId:%d p:%p not there, quit", refId, p);
taosUnlockList(pSet->lockedBy+hash);
return NULL;
}
// p is there
pNode = pNode->next;
if (pNode == NULL) {
taosUnlockList(pSet->lockedBy+hash);
hash++;
}
}
if (pNode == NULL) {
for (; hash < pSet->max; ++hash) {
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
if (pNode) break;
taosUnlockList(pSet->lockedBy+hash);
}
}
void *newP = NULL;
if (pNode) {
pNode->count++; // acquire it
newP = pNode->p;
taosUnlockList(pSet->lockedBy+hash);
uTrace("refId:%d p:%p is returned", refId, p);
} else {
uTrace("refId:%d p:%p the list is over", refId, p);
}
if (p) taosReleaseRef(refId, p); // release the current one
taosDecRefCount(pSet);
return newP;
}
int taosListRef() {
SRefSet *pSet;
SRefNode *pNode;
......
......@@ -326,6 +326,7 @@ int32_t taosHexStrToByteArray(char hexstr[], char bytes[]) {
return 0;
}
// TODO move to comm module
bool taosGetVersionNumber(char *versionStr, int *versionNubmer) {
if (versionStr == NULL || versionNubmer == NULL) {
return false;
......
......@@ -17,6 +17,19 @@ typedef struct {
void **p;
} SRefSpace;
void iterateRefs(int refId) {
int count = 0;
void *p = taosIterateRef(refId, NULL);
while (p) {
// process P
count++;
p = taosIterateRef(refId, p);
}
printf(" %d ", count);
}
void *takeRefActions(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
int code, id;
......@@ -44,6 +57,9 @@ void *takeRefActions(void *param) {
usleep(id % 5 + 1);
taosReleaseRef(pSpace->refId, pSpace->p[id]);
}
id = random() % pSpace->refNum;
iterateRefs(id);
}
for (int i=0; i < pSpace->refNum; ++i) {
......@@ -63,7 +79,7 @@ void *openRefSpace(void *param) {
SRefSpace *pSpace = (SRefSpace *)param;
printf("c");
pSpace->refId = taosOpenRef(10000, myfree);
pSpace->refId = taosOpenRef(50, myfree);
if (pSpace->refId < 0) {
printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId));
......
......@@ -154,6 +154,7 @@ python3 ./test.py -f query/queryConnection.py
python3 ./test.py -f query/queryCountCSVData.py
python3 ./test.py -f query/natualInterval.py
python3 ./test.py -f query/bug1471.py
python3 ./test.py -f query/dataLossTest.py
#stream
python3 ./test.py -f stream/metric_1.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
import inspect
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.numberOfTables = 240
self.numberOfRecords = 10000
def run(self):
tdSql.prepare()
os.system("yes | taosdemo -t %d -n %d" % (self.numberOfTables, self.numberOfRecords))
print("==============step1")
tdSql.execute("use test")
sql = "select count(*) from meters"
tdSql.query(sql)
rows = tdSql.getData(0, 0)
print ("number of records: %d" % rows)
newRows = rows
for i in range(10000):
print("kill taosd")
time.sleep(10)
os.system("sudo kill -9 $(pgrep taosd)")
tdDnodes.startWithoutSleep(1)
while True:
try:
tdSql.query(sql)
newRows = tdSql.getData(0, 0)
print("numer of records after kill taosd %d" % newRows)
time.sleep(10)
break
except Exception as e:
pass
continue
if newRows < rows:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, newRows, rows)
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
break
tdSql.query(sql)
tdSql.checkData(0, 0, rows)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -15,6 +15,7 @@ import sys
import os
import os.path
import subprocess
from time import sleep
from util.log import *
......@@ -210,6 +211,7 @@ class TDDnode:
(self.index, self.cfgPath))
def getBuildPath(self):
buildPath = ""
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
......@@ -256,6 +258,35 @@ class TDDnode:
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
time.sleep(5)
def startWithoutSleep(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/taosd"
if self.deployed == 0:
tdLog.exit("dnode:%d is not deployed" % (self.index))
if self.valgrind == 0:
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
binPath, self.cfgDir)
else:
valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"
cmd = "nohup %s %s -c %s 2>&1 & " % (
valgrindCmdline, binPath, self.cfgDir)
print(cmd)
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
def stop(self):
if self.valgrind == 0:
......@@ -425,6 +456,10 @@ class TDDnodes:
def start(self, index):
self.check(index)
self.dnodes[index - 1].start()
def startWithoutSleep(self, index):
self.check(index)
self.dnodes[index - 1].startWithoutSleep()
def stop(self, index):
self.check(index)
......
......@@ -25,7 +25,7 @@ class TDSql:
self.queryCols = 0
self.affectedRows = 0
def init(self, cursor, log=True):
def init(self, cursor, log=False):
self.cursor = cursor
if (log):
......
......@@ -132,4 +132,134 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna
sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbname from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1 limit 1
#1970-01-01 08:01:40.800 | 10 | 45.000000000 | 0 | true | false | 0 |
#1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 |
sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19;
if $rows != 100 then
return -1
endi
# c5 is null ! error
sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
if $rows != 100 then
return -1
endi
if $data00 != @70-01-01 08:01:40.990@ then
print expect 0, actual: $data00
return -1
endi
if $data01 != 30 then
return -1
endi
if $data02 != 94.500000000 then
print expect 94.500000000, actual $data02
return -1
endi
if $data03 != 94.500000000 then
return -1
endi
if $data04 != 90 then
return -1
endi
if $data05 != 1 then
return -1
endi
if $data06 != 2 then
return -1
endi
if $data10 != @70-01-01 08:01:40.980@ then
print expect 70-01-01 08:01:40.980, actual: $data10
return -1
endi
if $data11 != 30 then
return -1
endi
if $data12 != 84.500000000 then
print expect 84.500000000, actual $data12
return -1
endi
if $data13 != 84.500000000 then
return -1
endi
if $data14 != 80 then
return -1
endi
if $data15 != 1 then
return -1
endi
if $data16 != 2 then
return -1
endi
# this function will cause shell crash
sql select count(join_mt0.c1), first(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc;
if $rows != 100 then
return -1
endi
if $data00 != @70-01-01 08:01:40.990@ then
return -1
endi
if $data01 != 10 then
return -1
endi
if $data02 != 90 then
return -1
endi
if $data03 != 0 then
return -1
endi
if $data11 != 10 then
return -1
endi
if $data12 != 80 then
return -1
endi
if $data13 != 0 then
return -1
endi
sql select last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10m) group by join_mt0.t1 order by join_mt0.ts asc;
if $rows != 1 then
return -1
endi
if $data00 != @70-01-01 08:00:00.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -324,8 +324,22 @@ sql create table tm0 using m1 tags(1);
sql create table tm1 using m1 tags(2);
sql insert into tm0 values(10000, 1) (20000, 2)(30000, 3) (40000, NULL) (50000, 2) tm1 values(10001, 2)(20000,4)(90000,9);
sql select count(*),first(k),last(k) from m1 where tbname in ('tm0') interval(1s) order by ts desc;
#=============================tbase-1205
sql select count(*) from tm1 where ts<now and ts>= now -1d interval(1h) fill(NULL);
print ===================>TD-1834
sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc
if $rows != 0 then
return -1
endi
sql select * from tm0 where ts>11000 and ts< 20000 order by ts desc
if $rows != 0 then
return -1
endi
sql select count(*),first(k),last(k) from m1 where tbname in ('tm0') interval(1s) order by ts desc;
if $row != 5 then
return -1
endi
......@@ -386,7 +400,25 @@ sql_error select k+1,sum(k) from tm0;
sql_error select k, sum(k) from tm0;
sql_error select k, sum(k)+1 from tm0;
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
#=============================tbase-1205
sql select count(*) from tm1 where ts<now and ts>= now -1d interval(1h) fill(NULL);
print ===================>TD-1834
sql select * from tm0 where ts>11000 and ts< 20000 order by ts asc
if $rows != 0 then
return -1
endi
sql select * from tm0 where ts>11000 and ts< 20000 order by ts desc
if $rows != 0 then
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册