提交 4c39629d 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'main' of https://github.com/taosdata/TDengine into mark/tmq

......@@ -998,18 +998,14 @@ SAMPLE(expr, k)
**Description**: _k_ sampling values of a specific column. The applicable range of _k_ is [1,1000].
**Return value type**: Same as the column being operated plus the associated timestamp
**Return value type**: Same as the column being operated
**Applicable data types**: Any data type except for tags of STable
**Applicable data types**: Any data type
**Applicable nested query**: Inner query and Outer query
**Applicable table types**: standard tables and supertables
**More explanations**:
- This function cannot be used in expression calculation.
### TAIL
......@@ -1054,11 +1050,11 @@ TOP(expr, k)
UNIQUE(expr)
```
**Description**: The values that occur the first time in the specified column. The effect is similar to `distinct` keyword, but it can also be used to match tags or timestamp. The first occurrence of a timestamp or tag is used.
**Description**: The values that occur the first time in the specified column. The effect is similar to `distinct` keyword.
**Return value type**:Same as the data type of the column being operated upon
**Applicable column types**: Any data types except for timestamp
**Applicable column types**: Any data types
**Applicable table types**: table, STable
......
......@@ -990,18 +990,14 @@ SAMPLE(expr, k)
**功能说明**: 获取数据的 k 个采样值。参数 k 的合法输入范围是 1≤ k ≤ 1000。
**返回结果类型**: 同原始数据类型, 返回结果中带有该行记录的时间戳
**返回结果类型**: 同原始数据类型。
**适用数据类型**在超级表查询中使用时,不能应用在标签之上
**适用数据类型**全部类型字段
**嵌套子查询支持**: 适用于内层查询和外层查询。
**适用于**:表和超级表。
**使用说明**
- 不能参与表达式计算;该函数可以应用在普通表和超级表上;
### TAIL
......@@ -1046,11 +1042,11 @@ TOP(expr, k)
UNIQUE(expr)
```
**功能说明**:返回该列的数值首次出现的值。该函数功能与 distinct 相似,但是可以匹配标签和时间戳信息。可以针对除时间列以外的字段进行查询,可以匹配标签和时间戳,其中的标签和时间戳是第一次出现时刻的标签和时间戳
**功能说明**:返回该列数据首次出现的值。该函数功能与 distinct 相似
**返回数据类型**:同应用的字段。
**适用数据类型**适合于除时间类型以外的字段。
**适用数据类型**全部类型字段。
**适用于**: 表和超级表。
......
......@@ -111,6 +111,12 @@ int32_t udfStartUdfd(int32_t startDnodeId);
*/
int32_t udfStopUdfd();
/**
* get udfd pid
*
*/
int32_t udfGetUdfdPid(int32_t* pUdfdPid);
#ifdef __cplusplus
}
#endif
......
......@@ -749,6 +749,9 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.suid = pTableMeta->uid;
pReq.source = TD_REQ_FROM_TAOX;
pSql = (action == SCHEMA_ACTION_ADD_COLUMN) ? "sml_add_column" : "sml_modify_column_size";
} else{
uError("SML:0x%" PRIx64 " invalid action:%d", info->id, action);
goto end;
}
code = buildRequest(info->taos->id, pSql, strlen(pSql), NULL, false, &pRequest, 0);
......
......@@ -939,8 +939,6 @@ int stmtClose(TAOS_STMT* stmt) {
stmtCleanSQLInfo(pStmt);
taosMemoryFree(stmt);
STMT_DLOG_E("stmt freed");
return TSDB_CODE_SUCCESS;
}
......
......@@ -651,7 +651,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
int32_t j = 0;
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == vgId) {
break;
}
......@@ -665,7 +665,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
return;
}
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
......@@ -741,13 +741,15 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
static void generateTimedTask(int64_t refId, int32_t type) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
if(tmq == NULL) return;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
if(pTaskType == NULL) return;
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
void tmqAssignAskEpTask(void* param, void* tmrId) {
......@@ -762,19 +764,19 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
taosMemoryFree(param);
}
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__REPORT;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
}
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(param);
}
//void tmqAssignDelayedReportTask(void* param, void* tmrId) {
// int64_t refId = *(int64_t*)param;
// tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
// if (tmq != NULL) {
// int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
// *pTaskType = TMQ_DELAYED_TASK__REPORT;
// taosWriteQitem(tmq->delayedTask, pTaskType);
// tsem_post(&tmq->rspSem);
// }
//
// taosReleaseRef(tmqMgmt.rsetId, refId);
// taosMemoryFree(param);
//}
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
......
......@@ -969,7 +969,7 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
default:
fractionLen = 0;
ASSERT(false);
return;
}
if (taosLocalTime(&quot, &ptm, buf) == NULL) {
......
......@@ -232,7 +232,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
}
code = -1;
taosIp2String(pReq->info.conn.clientIp, ip);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) {
mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr());
......@@ -244,7 +243,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, terrstr());
goto _OVER;
}
if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0) {
mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd);
code = TSDB_CODE_MND_AUTH_FAILURE;
......@@ -270,6 +269,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
}
}
_CONNECT:
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) {
......
......@@ -474,8 +474,8 @@ double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num) {
}
ASSERTS(total <= numOfElem && total + pHisto->elems[j + 1].num > numOfElem,
"tHistogramUniform Error, total:%d, numOfElem:%d, elems[%d].num:%d",
total, numOfElem, j + 1, pHisto->elems[j + 1].num);
"tHistogramUniform Error, total:%ld, numOfElem:%ld, elems[%d].num:%ld",
total, (int64_t)numOfElem, j + 1, pHisto->elems[j + 1].num);
double delta = numOfElem - total;
if (fabs(delta) < FLT_EPSILON) {
......
......@@ -39,6 +39,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if (p != NULL) {
pIdList = *(SArray **)p;
} else {
taosMemoryFree(buffer);
return NULL;
}
......@@ -48,6 +49,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) {
taosMemoryFree(buffer);
return NULL;
}
......
......@@ -255,6 +255,18 @@ int32_t udfStopUdfd() {
return 0;
}
int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
SUdfdData *pData = &udfdGlobal;
if (pData->spawnErr) {
return pData->spawnErr;
}
uv_pid_t pid = uv_process_get_pid(&pData->process);
if (pUdfdPid) {
*pUdfdPid = (int32_t)pid;
}
return TSDB_CODE_SUCCESS;
}
//==============================================================================================
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
* The QUEUE is copied from queue.h under libuv
......
......@@ -965,40 +965,6 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
return code;
}
int32_t udfdConnectToMnode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd", sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = taosGetPId();
connReq.startTime = taosGetTimestampMs();
strcpy(connReq.sVer, version);
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_MND_CONNECT;
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.info.ahandle = msgInfo;
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
int32_t code = msgInfo->code;
uv_sem_destroy(&msgInfo->resultSem);
taosMemoryFree(msgInfo);
return code;
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
......@@ -1378,23 +1344,6 @@ static int32_t udfdRun() {
return 0;
}
void udfdConnectMnodeThreadFunc(void *args) {
int32_t retryMnodeTimes = 0;
int32_t code = 0;
while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) {
uv_sleep(100 * (1 << retryMnodeTimes));
code = udfdConnectToMnode();
if (code == 0) {
break;
}
fnError("udfd can not connect to mnode, code: %s. retry", tstrerror(code));
}
if (code != 0) {
fnError("udfd can not connect to mnode");
}
}
int32_t udfdInitResidentFuncs() {
if (strlen(tsUdfdResFuncs) == 0) {
return TSDB_CODE_SUCCESS;
......@@ -1497,9 +1446,6 @@ int main(int argc, char *argv[]) {
udfdInitResidentFuncs();
uv_thread_t mnodeConnectThread;
uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL);
udfdRun();
removeListeningPipe();
......
......@@ -3741,10 +3741,10 @@ int32_t fltSclBuildRangeFromBlockSma(SFltSclColumnRange *colRange, SColumnDataAg
taosArrayPush(points, &startPt);
taosArrayPush(points, &endPt);
}
SFltSclDatum min;
SFltSclDatum min = {0};
fltSclBuildDatumFromBlockSmaValue(&min, colRange->colNode->node.resType.type, pAgg->min);
SFltSclPoint minPt = {.excl = false, .start = true, .val = min};
SFltSclDatum max;
SFltSclDatum max = {0};
fltSclBuildDatumFromBlockSmaValue(&max, colRange->colNode->node.resType.type, pAgg->max);
SFltSclPoint maxPt = {.excl = false, .start = false, .val = max};
taosArrayPush(points, &minPt);
......
......@@ -8,6 +8,9 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c udf -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql alter user root pass 'taosdata2'
system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode1 -s start
print ======== step1 udf
system sh/compile_udf.sh
......
......@@ -17,6 +17,9 @@
#include <taosws.h>
#include <shellInt.h>
// save current database name
char curDBName[128] = ""; // TDB_MAX_DBNAME_LEN is 24, put large
int shell_conn_ws_server(bool first) {
char cuttedDsn[SHELL_WS_DSN_BUFF] = {0};
int dsnLen = strlen(shell.args.dsn);
......@@ -59,6 +62,14 @@ int shell_conn_ws_server(bool first) {
fprintf(stdout, "successfully connected to cloud service\n");
}
fflush(stdout);
// switch to current database if have
if(curDBName[0] !=0) {
char command[256];
sprintf(command, "use %s;", curDBName);
shellRunSingleCommandWebsocketImp(command);
}
return 0;
}
......@@ -290,7 +301,46 @@ void shellRunSingleCommandWebsocketImp(char *command) {
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$",
REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\r\n\r\n");
// copy dbname to curDBName
char *p = command;
bool firstStart = false;
bool firstEnd = false;
int i = 0;
while (*p != 0) {
if (*p != ' ') {
// not blank
if (!firstStart) {
firstStart = true;
} else if (firstEnd) {
if(*p == ';' && *p != '\\') {
break;
}
// database name
curDBName[i++] = *p;
if(i + 4 > sizeof(curDBName)) {
// DBName is too long, reset zero and break
i = 0;
break;
}
}
} else {
// blank
if(firstStart == true && firstEnd == false){
firstEnd = true;
}
if(firstStart && firstEnd && i > 0){
// blank after database name
break;
}
}
// move next
p++;
}
// append end
curDBName[i] = 0;
fprintf(stdout, "Database changed to %s.\r\n\r\n", curDBName);
fflush(stdout);
ws_free_result(res);
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册