提交 4ce2e4e0 编写于 作者: D dapan1121

support udf

上级 fbb5debc
...@@ -276,6 +276,7 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) { ...@@ -276,6 +276,7 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) {
int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char *msg1 = "function name is too long"; const char *msg1 = "function name is too long";
const char *msg2 = "path is too long";
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
switch (pInfo->type) { switch (pInfo->type) {
...@@ -285,6 +286,8 @@ int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -285,6 +286,8 @@ int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
uint32_t len = 0; uint32_t len = 0;
void *buf = NULL; void *buf = NULL;
createInfo->name.z[createInfo->name.n] = 0;
strdequote(createInfo->name.z); strdequote(createInfo->name.z);
if (strlen(createInfo->name.z) >= TSDB_FUNC_NAME_LEN) { if (strlen(createInfo->name.z) >= TSDB_FUNC_NAME_LEN) {
...@@ -296,6 +299,12 @@ int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -296,6 +299,12 @@ int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
createInfo->path.z[createInfo->path.n] = 0; createInfo->path.z[createInfo->path.n] = 0;
strdequote(createInfo->path.z); strdequote(createInfo->path.z);
if (strlen(createInfo->path.z) >= PATH_MAX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
strcpy(pMsg->path, createInfo->path.z);
int32_t ret = readFromFile(createInfo->path.z, &len, &buf); int32_t ret = readFromFile(createInfo->path.z, &len, &buf);
if (ret) { if (ret) {
......
...@@ -1300,8 +1300,20 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1300,8 +1300,20 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SShowInfo *pShowInfo = &pInfo->pMiscInfo->showOpt;
SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload; SShowMsg *pShowMsg = (SShowMsg *)pCmd->payload;
if (pShowInfo->showType == TSDB_MGMT_TABLE_FUNCTION) {
pShowMsg->type = pShowInfo->showType;
pShowMsg->payloadLen = 0;
pCmd->payloadLen = sizeof(SShowMsg);
return TSDB_CODE_SUCCESS;
}
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (tNameIsEmpty(&pTableMetaInfo->name)) { if (tNameIsEmpty(&pTableMetaInfo->name)) {
...@@ -1312,7 +1324,6 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1312,7 +1324,6 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tNameGetFullDbName(&pTableMetaInfo->name, pShowMsg->db); tNameGetFullDbName(&pTableMetaInfo->name, pShowMsg->db);
} }
SShowInfo *pShowInfo = &pInfo->pMiscInfo->showOpt;
pShowMsg->type = pShowInfo->showType; pShowMsg->type = pShowInfo->showType;
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) { if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
......
...@@ -572,6 +572,7 @@ typedef struct { ...@@ -572,6 +572,7 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
char path[PATH_MAX];
int32_t codeLen; int32_t codeLen;
char code[]; char code[];
} SCreateFuncMsg; } SCreateFuncMsg;
......
...@@ -48,7 +48,9 @@ static int32_t mnodeCreateFunc(SAcctObj *pAcct, SCreateFuncMsg *pCreate, SMnodeM ...@@ -48,7 +48,9 @@ static int32_t mnodeCreateFunc(SAcctObj *pAcct, SCreateFuncMsg *pCreate, SMnodeM
static int32_t mnodeDropDb(SMnodeMsg *newMsg); static int32_t mnodeDropDb(SMnodeMsg *newMsg);
static int32_t mnodeSetDbDropping(SDbObj *pDb); static int32_t mnodeSetDbDropping(SDbObj *pDb);
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeGetFuncMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateFuncMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateFuncMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
...@@ -185,7 +187,9 @@ int32_t mnodeInitDbs() { ...@@ -185,7 +187,9 @@ int32_t mnodeInitDbs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_FUNCTION, mnodeProcessDropFuncMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_FUNCTION, mnodeProcessDropFuncMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_FUNCTION, mnodeGetFuncMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_FUNCTION, mnodeRetrieveFuncs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
mDebug("table:dbs table is created"); mDebug("table:dbs table is created");
...@@ -473,7 +477,7 @@ static int32_t mnodeCreateFunc(SAcctObj *pAcct, SCreateFuncMsg *pCreate, SMnodeM ...@@ -473,7 +477,7 @@ static int32_t mnodeCreateFunc(SAcctObj *pAcct, SCreateFuncMsg *pCreate, SMnodeM
int32_t code = acctCheck(pAcct, ACCT_GRANT_DB); int32_t code = acctCheck(pAcct, ACCT_GRANT_DB);
if (code != 0) return code; if (code != 0) return code;
mError("Function name:%s, code:%.*s", pCreate->name, pCreate->codeLen, pCreate->code); mError("Function name:%s, path:%s, code:%.*s", pCreate->name, pCreate->path, pCreate->codeLen, pCreate->code);
return code; return code;
} }
...@@ -545,6 +549,9 @@ void mnodeCleanupDbs() { ...@@ -545,6 +549,9 @@ void mnodeCleanupDbs() {
tsDbSdb = NULL; tsDbSdb = NULL;
} }
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
...@@ -700,6 +707,41 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn ...@@ -700,6 +707,41 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
return 0; return 0;
} }
static int32_t mnodeGetFuncMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = (TSDB_FUNC_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "path");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
//TODO GET ROWS NUM
pShow->numOfRows = 1;
return 0;
}
char *mnodeGetDbStr(char *src) { char *mnodeGetDbStr(char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER); char *pos = strstr(src, TS_PATH_DELIMITER);
if (pos != NULL) ++pos; if (pos != NULL) ++pos;
...@@ -847,6 +889,36 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void ...@@ -847,6 +889,36 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
return numOfRows; return numOfRows;
} }
static int32_t mnodeRetrieveFuncs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
char * pWrite;
int32_t cols = 0;
while (numOfRows < rows) {
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "aaa", pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "/tmp/abc", pShow->bytes[cols]);
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
void mnodeAddSuperTableIntoDb(SDbObj *pDb) { void mnodeAddSuperTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, 1); atomic_add_fetch_32(&pDb->numOfSuperTables, 1);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册