未验证 提交 78f05459 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #11769 from taosdata/feature/qnode

feat: fix qnode list type issue
......@@ -639,13 +639,19 @@ int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
typedef struct {
SArray* epSetList; // SArray<SEpSet>
SArray* addrsList; // SArray<SQueryNodeAddr>
} SQnodeListRsp;
int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
SEpSet epSet;
} SQueryNodeAddr;
typedef struct {
SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp;
......
......@@ -133,12 +133,6 @@ typedef struct SMsgSendInfo {
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
SEpSet epSet;
} SQueryNodeAddr;
typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
......
......@@ -151,13 +151,14 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
taosMemoryFreeClear(output.dbVgroup);
tscError("failed to build use db output since %s", terrstr());
} else {
} else if (output.dbVgroup) {
struct SCatalog* pCatalog = NULL;
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code1));
taosMemoryFreeClear(output.dbVgroup);
} else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
}
......
......@@ -114,6 +114,19 @@ int32_t tDecodeSEpSet(SCoder *pDecoder, SEpSet *pEp) {
return 0;
}
int32_t tEncodeSQueryNodeAddr(SCoder *pEncoder, SQueryNodeAddr *pAddr) {
if (tEncodeI32(pEncoder, pAddr->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pAddr->epSet) < 0) return -1;
return 0;
}
int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) {
if (tDecodeI32(pDecoder, &pAddr->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pAddr->epSet) < 0) return -1;
return 0;
}
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
......@@ -2058,11 +2071,11 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp)
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->epSetList);
int32_t num = taosArrayGetSize(pRsp->addrsList);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SEpSet *epSet = taosArrayGet(pRsp->epSetList, i);
if (tEncodeSEpSet(&encoder, epSet) < 0) return -1;
SQueryNodeAddr *addr = taosArrayGet(pRsp->addrsList, i);
if (tEncodeSQueryNodeAddr(&encoder, addr) < 0) return -1;
}
tEndEncode(&encoder);
......@@ -2078,10 +2091,10 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
pRsp->epSetList = taosArrayInit(num, sizeof(SEpSet));
if (NULL == pRsp->epSetList) return -1;
pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr));
if (NULL == pRsp->addrsList) return -1;
for (int32_t i = 0; i < num; ++i) {
if (tDecodeSEpSet(&decoder, TARRAY_GET_ELEM(pRsp->epSetList, i)) < 0) return -1;
if (tDecodeSQueryNodeAddr(&decoder, TARRAY_GET_ELEM(pRsp->addrsList, i)) < 0) return -1;
}
tEndDecode(&decoder);
......@@ -2089,7 +2102,7 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
return 0;
}
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->epSetList); }
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->addrsList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SCoder encoder = {0};
......
......@@ -370,9 +370,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
}
dmReportStartup(pDnode, "dnode-transport", "initialized");
if (dmStartUdfd(pDnode) != 0) {
dError("failed to start udfd");
}
// if (dmStartUdfd(pDnode) != 0) {
// dError("failed to start udfd");
// }
dInfo("dnode-mgmt is initialized");
return 0;
......
......@@ -30,6 +30,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
int32_t mndGetGlobalVgroupVersion(int32_t *vgId);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
......@@ -1191,6 +1191,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
char *p = strchr(usedbReq.db, '.');
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
//mndGetGlobalVgroupVersion(); TODO
static int32_t vgVersion = 1;
if (usedbReq.vgVersion < vgVersion) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
......@@ -1202,16 +1203,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion++;
if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
} else {
code = 0;
}
} else {
usedbRsp.vgVersion = usedbReq.vgVersion;
code = 0;
}
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
code = 0;
// no jump, need to construct rsp
} else {
......
......@@ -444,8 +444,8 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
goto _OVER;
}
qlistRsp.epSetList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == qlistRsp.epSetList) {
qlistRsp.addrsList = taosArrayInit(5, sizeof(SQueryNodeAddr));
if (NULL == qlistRsp.addrsList) {
mError("failed to alloc epSet while process qnode list req");
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
......@@ -455,11 +455,13 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj);
if (pIter == NULL) break;
SEpSet epSet = {.numOfEps = 1};
tstrncpy(epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->pDnode->port;
SQueryNodeAddr nodeAddr = {0};
nodeAddr.nodeId = QNODE_HANDLE;
nodeAddr.epSet.numOfEps = 1;
tstrncpy(nodeAddr.epSet.eps[0].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
nodeAddr.epSet.eps[0].port = pObj->pDnode->port;
(void)taosArrayPush(qlistRsp.epSetList, &epSet);
(void)taosArrayPush(qlistRsp.addrsList, &nodeAddr);
numOfRows++;
sdbRelease(pSdb, pObj);
......
......@@ -359,10 +359,10 @@ PROCESS_QLIST_OVER:
if (code != 0) {
tFreeSQnodeListRsp(&out);
out.epSetList = NULL;
out.addrsList = NULL;
}
*(SArray **)output = out.epSetList;
*(SArray **)output = out.addrsList;
return code;
}
......
......@@ -1388,6 +1388,14 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter;
if (NULL == sch->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
pIter = taosHashIterate(mgmt->schHash, pIter);
continue;
}
code = qwGenerateSchHbRsp(mgmt, (SQWSchStatus *)pIter, &rspList[i]);
if (code) {
taosHashCancelIterate(mgmt->schHash, pIter);
......
......@@ -186,6 +186,7 @@ typedef struct SFilterColCtx {
typedef struct SFilterCompare {
uint8_t type;
int8_t precision;
uint8_t optr;
uint8_t optr2;
} SFilterCompare;
......@@ -218,6 +219,7 @@ typedef struct SFltTreeStat {
int32_t code;
int8_t precision;
bool scalarMode;
SArray* nodeList;
SFilterInfo* info;
} SFltTreeStat;
......@@ -303,6 +305,7 @@ struct SFilterInfo {
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SColumnNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_COL_FIELD_PRECISION(fi) (((SColumnNode *)((fi)->desc))->node.resType.precision)
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SColumnNode *)((fi)->desc))->node.resType.bytes)
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnNode *)((fi)->desc))->colId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnNode *)((fi)->desc))->slotId)
......@@ -317,6 +320,7 @@ struct SFilterInfo {
#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right)
#define FILTER_UNIT_RIGHT2_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right2)
#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type)
#define FILTER_UNIT_DATA_PRECISION(u) ((u)->compare.precision)
#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri)
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
......
......@@ -21,6 +21,7 @@
#include "sclInt.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"
OptrStr gOptrStr[] = {
{0, "invalid"},
......@@ -986,6 +987,7 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi
assert(FILTER_GET_FLAG(col->flag, FLD_TYPE_COLUMN));
info->units[info->unitNum].compare.type = FILTER_GET_COL_FIELD_TYPE(col);
info->units[info->unitNum].compare.precision = FILTER_GET_COL_FIELD_PRECISION(col);
*uidx = info->unitNum;
......@@ -1748,6 +1750,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
assert(FILTER_GET_FLAG(right->flag, FLD_TYPE_VALUE));
uint32_t type = FILTER_UNIT_DATA_TYPE(unit);
int8_t precision = FILTER_UNIT_DATA_PRECISION(unit);
SFilterField* fi = right;
SValueNode* var = (SValueNode *)fi->desc;
......@@ -1801,6 +1804,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
} else {
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
out.columnData->info.type = type;
out.columnData->info.precision = precision;
if (IS_VAR_DATA_TYPE(type)) {
out.columnData->info.bytes = bytes;
} else {
......@@ -3475,6 +3479,33 @@ int32_t filterFreeNcharColumns(SFilterInfo* info) {
return TSDB_CODE_SUCCESS;
}
int32_t fltAddValueNodeToConverList(SFltTreeStat *stat, SValueNode* pNode) {
if (NULL == stat->nodeList) {
stat->nodeList = taosArrayInit(10, POINTER_BYTES);
if (NULL == stat->nodeList) {
FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
if (NULL == taosArrayPush(stat->nodeList, &pNode)) {
FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
void fltConvertToTsValueNode(SFltTreeStat *stat, SValueNode* valueNode) {
char *timeStr = valueNode->datum.p;
if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, stat->precision, &valueNode->datum.i) !=
TSDB_CODE_SUCCESS) {
valueNode->datum.i = 0;
}
taosMemoryFree(timeStr);
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
}
EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
SFltTreeStat *stat = (SFltTreeStat *)pContext;
......@@ -3504,25 +3535,23 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
}
SValueNode *valueNode = (SValueNode *)*pNode;
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type) {
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type && TSDB_DATA_TYPE_NCHAR != valueNode->node.resType.type) {
return DEAL_RES_CONTINUE;
}
#if 0
if (stat->precision < 0) {
//TODO
int32_t code = fltAddValueNodeToConverList(stat, valueNode);
if (code) {
stat->code = code;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
char *timeStr = valueNode->datum.p;
if (taosParseTime(valueNode->datum.p, &valueNode->datum.i, valueNode->node.resType.bytes, stat->precision, tsDaylight) !=
TSDB_CODE_SUCCESS) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
TODO
#else
fltConvertToTsValueNode(stat, valueNode);
return DEAL_RES_CONTINUE;
#endif
}
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
......@@ -3619,9 +3648,22 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
}
int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
int32_t code = 0;
nodesRewriteExprPostOrder(pNode, fltReviseRewriter, (void *)pStat);
FLT_RET(pStat->code);
FLT_ERR_JRET(pStat->code);
int32_t nodeNum = taosArrayGetSize(pStat->nodeList);
for (int32_t i = 0; i < nodeNum; ++i) {
SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
fltConvertToTsValueNode(pStat, valueNode);
}
_return:
taosArrayDestroy(pStat->nodeList);
FLT_RET(code);
}
int32_t fltOptimizeNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
......
......@@ -24,6 +24,7 @@
#include "tcompare.h"
#include "tdatablock.h"
#include "ttypes.h"
#include "ttime.h"
#define LEFT_COL ((pLeftCol->info.type == TSDB_DATA_TYPE_JSON ? (void*)pLeftCol : pLeftCol->pData))
#define RIGHT_COL ((pRightCol->info.type == TSDB_DATA_TYPE_JSON ? (void*)pRightCol : pRightCol->pData))
......@@ -252,6 +253,15 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) {
return p;
}
static FORCE_INLINE void varToTimestamp(char *buf, SScalarParam* pOut, int32_t rowIndex) {
int64_t value = 0;
if (taosParseTime(buf, &value, strlen(buf), pOut->columnData->info.precision, tsDaylight) != TSDB_CODE_SUCCESS) {
value = 0;
}
colDataAppendInt64(pOut->columnData, rowIndex, &value);
}
static FORCE_INLINE void varToSigned(char *buf, SScalarParam* pOut, int32_t rowIndex) {
int64_t value = strtoll(buf, NULL, 10);
colDataAppendInt64(pOut->columnData, rowIndex, &value);
......@@ -295,7 +305,7 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
_bufConverteFunc func = NULL;
if (TSDB_DATA_TYPE_BOOL == outType) {
func = varToBool;
} else if (IS_SIGNED_NUMERIC_TYPE(outType) || TSDB_DATA_TYPE_TIMESTAMP == outType) {
} else if (IS_SIGNED_NUMERIC_TYPE(outType)) {
func = varToSigned;
} else if (IS_UNSIGNED_NUMERIC_TYPE(outType)) {
func = varToUnsigned;
......@@ -305,6 +315,8 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
ASSERT(inType == TSDB_DATA_TYPE_VARCHAR);
func = varToNchar;
vton = true;
} else if (TSDB_DATA_TYPE_TIMESTAMP == outType) {
func = varToTimestamp;
} else {
sclError("invalid convert outType:%d", outType);
return TSDB_CODE_QRY_APP_ERROR;
......@@ -594,8 +606,8 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 0, 7, 0, 0,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 9, 9, 9, 7, 0, 7, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册