提交 03070108 编写于 作者: S slzhou

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0_udfd

......@@ -103,6 +103,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
......
......@@ -213,7 +213,6 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
*/
int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
......
......@@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = {
static const SPerfsTableSchema topicSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
/*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
/*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/
};
static const SPerfsTableSchema consumerSchema[] = {
......
......@@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);
/*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return code;
}
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) {
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark,
STrans *pTrans) {
SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) {
......
......@@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
int32_t mndInitTopic(SMnode *pMnode) {
......@@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
return sdbSetTable(pMnode->pSdb, table);
......@@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return 0;
}
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SMqTopicObj *pTopic = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[TSDB_DB_FNAME_LEN] = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
if (pShow->pIter == NULL) break;
if (pTopic->dbUid != pDb->uid) {
if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
}
int32_t cols = 0;
sdbRelease(pSdb, pTopic);
continue;
}
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(&topicName[VARSTR_HEADER_SIZE], pTopic->name, TSDB_TOPIC_NAME_LEN);
varDataSetLen(topicName, strlen(&topicName[VARSTR_HEADER_SIZE]));
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
char topicName[TSDB_TOPIC_NAME_LEN] = {0};
tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, topicName);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTopic->createTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE);
strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql);
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
cols++;
taosMemoryFree(sql);
numOfRows++;
sdbRelease(pSdb, pTopic);
}
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......
......@@ -1326,7 +1326,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
......
......@@ -43,7 +43,8 @@ void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn);
#ifdef __cplusplus
}
......
......@@ -948,7 +948,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
......@@ -1285,23 +1285,51 @@ _return:
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0;
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
sch->hbConnInfo.handle = NULL;
sch->hbConnInfo.ahandle = NULL;
QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
} else {
QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
}
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0;
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
uint64_t seqId = 0;
void *origHandle = NULL;
memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
if (qwMsg->code) {
QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
}
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));
QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) {
tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
}
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
......@@ -1314,7 +1342,14 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return:
memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) {
tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER);
}
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_RET(TSDB_CODE_SUCCESS);
......
......@@ -286,7 +286,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
}
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
......@@ -313,6 +313,42 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS;
}
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) {
SSchedulerHbReq req = {0};
req.header.vgId = mgmt->nodeId;
req.sId = sId;
int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
if (msgSize < 0) {
QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *msg = rpcMallocCont(msgSize);
if (NULL == msg) {
QW_SCH_ELOG("calloc %d failed", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
taosMemoryFree(msg);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = sizeof(SSchedulerHbReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
};
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
......@@ -587,10 +623,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
}
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
......
......@@ -1123,20 +1123,20 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
}
char *input[2];
for (int32_t k = 0; k < 2; ++k) {
int32_t type = GET_PARAM_TYPE(&pInput[k]);
if (type != TSDB_DATA_TYPE_BIGINT && type != TSDB_DATA_TYPE_TIMESTAMP &&
type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
return TSDB_CODE_FAILED;
int32_t numOfRows = 0;
for (int32_t i = 0; i < inputNum; ++i) {
if (pInput[i].numOfRows > numOfRows) {
numOfRows = pInput[i].numOfRows;
}
}
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
char *input[2];
for (int32_t i = 0; i < numOfRows; ++i) {
bool hasNull = false;
for (int32_t k = 0; k < 2; ++k) {
if (colDataIsNull_s(pInput[0].columnData, i)) {
colDataAppendNULL(pOutput->columnData, i);
continue;
if (colDataIsNull_s(pInput[k].columnData, i)) {
hasNull = true;
break;
}
int32_t rowIdx = (pInput[k].numOfRows == 1) ? 0 : i;
......@@ -1178,6 +1178,11 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
}
}
if (hasNull) {
colDataAppendNULL(pOutput->columnData, i);
continue;
}
int64_t result = (timeVal[0] >= timeVal[1]) ? (timeVal[0] - timeVal[1]) :
(timeVal[1] - timeVal[0]);
......@@ -1238,7 +1243,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
colDataAppend(pOutput->columnData, i, (char *)&result, false);
}
pOutput->numOfRows = pInput->numOfRows;
pOutput->numOfRows = numOfRows;
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册