diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index bdb35cb07274a880c98aa2c93005c47fe05c7de2..f2a9b9db43a854ed11e421daedcf66f907622f19 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -285,8 +285,8 @@ typedef struct { char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) SColumnIndex* pColumnIndex; - SArithmeticSupport* pArithSup; // support the arithmetic expression calculation on agg functions - struct SLocalReducer* pLocalReducer; + SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions + struct SLocalReducer *pLocalReducer; } SSqlRes; typedef struct STscObj { diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 4c28adc261f6b83b3c1bbb68d3dd3d56e6007fc9..ab52ef396b57e63a20060e2346d2052e9b851c64 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -46,7 +46,8 @@ typedef struct SCreateBuilder { SSqlObj *pInterSql; int32_t (*fp)(void *para, char* result); Stage callStage; -} SCreateBuilder; +} SCreateBuilder; + static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength); static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { @@ -207,10 +208,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { const int32_t TYPE_COLUMN_LENGTH = 16; const int32_t NOTE_COLUMN_MIN_LENGTH = 8; - int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH;//tscMaxLengthOfTagsFields(pSql); -// if (noteFieldLen == 0) { -// noteFieldLen = NOTE_COLUMN_MIN_LENGTH; -// } + int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH; int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen); tscFieldInfoUpdateOffset(pQueryInfo); @@ -822,26 +820,36 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { } +// TODO add test cases. +static int32_t checkForOnlineNode(SSqlObj* pSql) { + int32_t* data = pSql->res.length; + + int32_t total = data[0]; + int32_t online = data[1]; + return (online < total)? TSDB_CODE_RPC_NETWORK_UNAVAIL:TSDB_CODE_SUCCESS; +} + static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); if (pHb != NULL) { - int32_t code = pHb->res.code; + pSql->res.code = pHb->res.code; taosReleaseRef(tscObjRef, pObj->hbrid); - if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - return pSql->res.code; - } - } else { - if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - return pSql->res.code; - } } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + return pSql->res.code; + } + + pSql->res.code = checkForOnlineNode(pHb); + if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + return pSql->res.code; + } + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); + int32_t val = 1; tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t)); return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 994dace1e3f5c129970efefbceee806d0e803c82..f450f4aa4052f69e7c6dfb535c9afa15ded5b176 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -147,15 +147,15 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SSqlObj *pSql = tres; SSqlRes *pRes = &pSql->res; - if (code == 0) { + if (code == TSDB_CODE_SUCCESS) { SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; - SRpcEpSet * epSet = &pRsp->epSet; + SRpcEpSet *epSet = &pRsp->epSet; if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); if (!tscEpSetIsEqual(&pSql->pTscObj->tscCorMgmtEpSet->epSet, epSet)) { tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); for (int8_t i = 0; i < epSet->numOfEps; i++) { - tscTrace("endpoint %d: fqdn = %s, port=%d", i, epSet->fqdn[i], epSet->port[i]); + tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]); } tscUpdateMgmtEpSet(pSql, epSet); } @@ -167,11 +167,40 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { tscKillConnection(pObj); return; } else { - if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId)); - if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); + if (pRsp->queryId) { + tscKillQuery(pObj, htonl(pRsp->queryId)); + } + + if (pRsp->streamId) { + tscKillStream(pObj, htonl(pRsp->streamId)); + } + } + + int32_t total = htonl(pRsp->totalDnodes); + int32_t online = htonl(pRsp->onlineDnodes); + assert(online <= total); + + if (online < total) { + tscError("HB:%p, total dnode:%d, online dnode:%d", pSql, total, online); + pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; } + + if (pRes->buffer == NULL) { + pRes->length = calloc(2, sizeof(int32_t)); + } + + pRes->length[0] = total; + pRes->length[1] = online; } else { tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code)); + if (pRes->buffer == NULL) { + pRes->length = calloc(2, sizeof(int32_t)); + } + + pRes->length[1] = 0; + if (pRes->length[0] == 0) { + pRes->length[0] = 1; // make sure that the value of the total node is greater than the online node + } } if (pObj->hbrid != 0) {