提交 cfce1340 编写于 作者: H Haojun Liao

[td-11818] implementate the taos_fetch_row .

上级 63869172
......@@ -804,7 +804,7 @@ typedef struct {
} SVgroupsMsg, SVgroupsInfo;
typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table id
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
char stbFname[TSDB_TABLE_FNAME_LEN];
int32_t numOfTags;
int32_t numOfColumns;
......
......@@ -151,7 +151,7 @@ typedef struct SParseContext {
* @param msg extended error message if exists.
* @return error code
*/
int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t* type, void** pOutput, char* msg, int32_t msgLen);
int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t* type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen);
typedef enum {
PAYLOAD_TYPE_KV = 0,
......
......@@ -20,14 +20,15 @@
extern "C" {
#endif
#include <common.h>
#include "taos.h"
#include "taosmsg.h"
#include "tdef.h"
#include "tep.h"
#include "thash.h"
#include "tlist.h"
#include "trpc.h"
#include "tdef.h"
#include "tmsgtype.h"
#include "tep.h"
#include "trpc.h"
typedef struct SQueryExecMetric {
int64_t start; // start timestamp
......@@ -86,10 +87,18 @@ typedef struct STscObj {
SAppInstInfo *pAppInfo;
} STscObj;
typedef struct SClientResultInfo {
SSDataBlock *pData;
TAOS_FIELD *resultFields;
int32_t current;
} SClientResultInfo;
typedef struct SReqBody {
tsem_t rspSem; // not used now
void* fp;
void* param;
int32_t paramLen;
SClientResultInfo* pResInfo;
} SRequestBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
......@@ -142,6 +151,8 @@ void initMsgHandleFp();
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, size_t sqlLen);
void* doFetchRow(SRequestObj* pRequest);
#ifdef __cplusplus
}
#endif
......
......@@ -149,11 +149,12 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, size_t sqlLen) {
} else {
int32_t type = 0;
void* output = NULL;
code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE);
int32_t outputLen = 0;
code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE);
if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW) {
pRequest->type = type;
pRequest->body.param = output;
pRequest->body.paramLen = outputLen;
SRequestMsgBody body = {0};
buildRequestMsgFp[type](pRequest, &body);
......@@ -374,4 +375,21 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c
strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
strncpy(dbStr, db, MIN(TSDB_DB_NAME_LEN - 1, dbLen));
return taos_connect(ipStr, userStr, passStr, dbStr, port);
}
void* doFetchRow(SRequestObj* pRequest) {
assert(pRequest != NULL);
SClientResultInfo* pResultInfo = pRequest->body.pResInfo;
if (pResultInfo == NULL || pResultInfo->current >= pResultInfo->pData->info.rows) {
if (pResultInfo == NULL) {
pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo));
// pRequest->body.pResInfo.
}
// current data set are exhausted, fetch more result from node
// if (pRes->row >= pRes->numOfRows && needToFetchNewBlock(pSql)) {
// taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
// tsem_wait(&pSql->rspSem);
// }
}
}
\ No newline at end of file
......@@ -117,3 +117,17 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return taos_query_l(taos, sql, strlen(sql));
}
TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
if (pRes == NULL) {
return NULL;
}
SRequestObj *pRequest = (SRequestObj *) pRes;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pRequest->type == TSDB_SQL_INSERT) {
return NULL;
}
return doFetchRow(pRequest);
}
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <catalog.h>
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
......@@ -3165,7 +3166,74 @@ int32_t buildCreateUserMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_USER;
pMsgBody->msgLen = sizeof(SCreateUserMsg);
pMsgBody->requestObjRefId = pRequest->self;
pMsgBody->pData = pRequest->body.param;
pMsgBody->pData = pRequest->body.param;
return 0;
}
int32_t buildShowMsg(SRequestObj* pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_SHOW;
pMsgBody->msgLen = pRequest->body.paramLen;
pMsgBody->requestObjRefId = pRequest->self;
pMsgBody->pData = pRequest->body.param;
}
STableMeta* createTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2);
size_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->vgId = pTableMetaMsg->vgroup.vgId;
pTableMeta->suid = pTableMetaMsg->suid;
pTableMeta->uid = pTableMetaMsg->tuid;
pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags,
.precision = pTableMetaMsg->precision,
.numOfColumns = pTableMetaMsg->numOfColumns,
};
pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion;
memcpy(pTableMeta->schema, pTableMetaMsg->pSchema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < numOfTotalCols; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
SShowRsp* pShow = (SShowRsp *)pMsg;
pShow->showId = htonl(pShow->showId);
STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);
SSchema* pSchema = pMetaMsg->pSchema;
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
pSchema->bytes = htons(pSchema->bytes);
pSchema++;
}
STableMeta* pTableMeta = createTableMetaFromMsg(pMetaMsg);
SSchema *pTableSchema = pTableMeta->schema;
TAOS_FIELD* pFields = calloc(1, pTableMeta->tableInfo.numOfColumns);
for (int16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i, ++pSchema) {
tstrncpy(pFields[i].name, pTableSchema[i].name, tListLen(pFields[i].name));
pFields[i].type = pTableSchema[i].type;
pFields[i].bytes = pTableSchema[i].bytes;
}
// pRequest->body.resultFields = pFields;
// pRequest->body.numOfFields = pTableMeta->tableInfo.numOfColumns;
return 0;
}
......@@ -3250,5 +3318,7 @@ void initMsgHandleFp() {
buildRequestMsgFp[TSDB_SQL_CREATE_USER] = buildCreateUserMsg;
buildRequestMsgFp[TSDB_SQL_SHOW] = buildShowMsg;
handleRequestRspFp[TSDB_SQL_SHOW] = processShowRsp;
}
\ No newline at end of file
......@@ -37,14 +37,16 @@ TEST(testCase, driverInit_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "show users");
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "show users");
TAOS_ROW pRow = taos_fetch_row(pRes);
assert(pRow != NULL);
taos_close(pConn);
}
\ No newline at end of file
......@@ -471,7 +471,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pMeta->numOfColumns = htonl(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -60,7 +60,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
* @param type
* @return
*/
int32_t qParserValidateDdlSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* type, char* msgBuf, int32_t msgBufLen);
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* outputLen, int32_t* type, char* msgBuf, int32_t msgBufLen);
/**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
......
......@@ -65,7 +65,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
bool isDdlSql(SSqlInfo* pSqlInfo);
bool isDclSqlStatement(SSqlInfo* pSqlInfo);
#ifdef __cplusplus
}
......
......@@ -4069,7 +4069,93 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
return code;
}
int32_t qParserValidateDdlSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* type, char* msgBuf, int32_t msgBufLen) {
// todo remove it
static int32_t setShowInfo(struct SSqlInfo* pInfo, void** output, int32_t* msgLen, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid name";
const char* msg2 = "wildcard string should be less than %d characters";
const char* msg3 = "database name too long";
const char* msg4 = "pattern is invalid";
const char* msg5 = "database name is empty";
const char* msg6 = "pattern string is empty";
/*
* database prefix in pInfo->pMiscInfo->a[0]
* wildcard in like clause in pInfo->pMiscInfo->a[1]
*/
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
int16_t showType = pShowInfo->showType;
if (showType == TSDB_MGMT_TABLE_TABLE || showType == TSDB_MGMT_TABLE_VGROUP) {
SToken* pDbPrefixToken = &pShowInfo->prefix;
if (pDbPrefixToken->type != 0) {
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (pDbPrefixToken->n <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg5);
}
if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
// int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken);
// if (ret != TSDB_CODE_SUCCESS) {
// return buildInvalidOperationMsg(pMsgBuf, msg1);
// }
}
// show table/stable like 'xxxx', set the like pattern for show tables
SToken* pPattern = &pShowInfo->pattern;
if (pPattern->type != 0) {
if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) {
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
pPattern->n = strdequote(pPattern->z);
if (pPattern->n <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg6);
}
if (pPattern->n > tsMaxWildCardsLen) {
char tmp[64] = {0};
sprintf(tmp, msg2, tsMaxWildCardsLen);
return buildInvalidOperationMsg(pMsgBuf, tmp);
}
}
} else if (showType == TSDB_MGMT_TABLE_VNODES) {
if (pShowInfo->prefix.type == 0) {
return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep");
}
if (pShowInfo->prefix.type == TK_STRING) {
pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z);
}
}
SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg));
pShowMsg->type = pShowInfo->showType;
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
SToken* pPattern = &pShowInfo->pattern;
if (pPattern->type > 0) { // only show tables support wildcard query
strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
pShowMsg->payloadLen = htons(pPattern->n);
}
} else {
SToken* pEpAddr = &pShowInfo->prefix;
assert(pEpAddr->n > 0 && pEpAddr->type > 0);
strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
pShowMsg->payloadLen = htons(pEpAddr->n);
}
*output = pShowMsg;
*msgLen = sizeof(SShowMsg) + htons(pShowMsg->payloadLen);
return TSDB_CODE_SUCCESS;
}
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* outputLen, int32_t* type, char* msgBuf, int32_t msgBufLen) {
int32_t code = 0;
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
......@@ -4125,9 +4211,14 @@ int32_t qParserValidateDdlSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in
*output = buildUserManipulationMsg(pInfo, id, msgBuf, msgBufLen);
break;
}
case TSDB_SQL_SHOW: {
code = setShowInfo(pInfo, output, outputLen, pMsgBuf);
break;
}
default:
break;
}
return 0;
return code;
}
......@@ -31,7 +31,7 @@ bool qIsInsertSql(const char* pStr, size_t length) {
} while (1);
}
int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *type, void** pOutput, char* msg, int32_t msgLen) {
int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen) {
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
if (pQueryInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code.
......@@ -45,8 +45,8 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ
return terrno;
}
if (isDdlSql(&info)) {
int32_t code = qParserValidateDdlSqlNode(&info, id, pOutput, type, msg, msgLen);
if (isDclSqlStatement(&info)) {
int32_t code = qParserValidateDclSqlNode(&info, id, pOutput, outputLen, type, msg, msgLen);
if (code == TSDB_CODE_SUCCESS) {
// do nothing
}
......
......@@ -12,8 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "parserUtil.h"
#include <tmsgtype.h>
#include "taosmsg.h"
#include "parser.h"
......@@ -23,6 +22,8 @@
#include "thash.h"
#include "tbuffer.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "tmsgtype.h"
#include "queryInfoUtil.h"
#include "function.h"
......@@ -1632,7 +1633,7 @@ uint32_t convertRelationalOperator(SToken *pToken) {
}
}
bool isDdlSql(SSqlInfo* pSqlInfo) {
bool isDclSqlStatement(SSqlInfo* pSqlInfo) {
return (pSqlInfo->type != TSDB_SQL_SELECT);
}
......
......@@ -714,16 +714,14 @@ TEST(testCase, show_user_Test) {
SSqlInfo info1 = doGenerateAST(sql1);
ASSERT_EQ(info1.valid, true);
void* output = NULL;
int32_t type = 0;
int32_t len = 0;
int32_t code = qParserValidateDclSqlNode(&info1, 1, &output, &len, &type, msg, buf.len);
ASSERT_EQ(code, 0);
// convert the show command to be the select query
// select name, privilege, create_time, account from information_schema.users;
SQueryStmtInfo* pQueryInfo = createQueryInfo();
// setTableMetaInfo(pQueryInfo, &req);
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
// ASSERT_NE(ret, 0);
}
TEST(testCase, create_user_Test) {
......@@ -737,12 +735,12 @@ TEST(testCase, create_user_Test) {
SSqlInfo info1 = doGenerateAST(sql);
ASSERT_EQ(info1.valid, true);
ASSERT_EQ(isDdlSql(&info1), true);
ASSERT_EQ(isDclSqlStatement(&info1), true);
void* output = NULL;
int32_t type = 0;
int32_t code = qParserValidateDdlSqlNode(&info1, 1, &output, &type, msg, buf.len);
int32_t len = 0;
int32_t code = qParserValidateDclSqlNode(&info1, 1, &output, &len, &type, msg, buf.len);
ASSERT_EQ(code, 0);
destroySqlInfo(&info1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册