提交 69afe77e 编写于 作者: dengyihao's avatar dengyihao

add req_id interface

上级 53052a29
...@@ -130,17 +130,16 @@ typedef struct TAOS_VGROUP_HASH_INFO { ...@@ -130,17 +130,16 @@ typedef struct TAOS_VGROUP_HASH_INFO {
int32_t vgId; int32_t vgId;
uint32_t hashBegin; uint32_t hashBegin;
uint32_t hashEnd; uint32_t hashEnd;
} TAOS_VGROUP_HASH_INFO ; } TAOS_VGROUP_HASH_INFO;
typedef struct TAOS_DB_ROUTE_INFO { typedef struct TAOS_DB_ROUTE_INFO {
int32_t routeVersion; int32_t routeVersion;
int16_t hashPrefix; int16_t hashPrefix;
int16_t hashSuffix; int16_t hashSuffix;
int8_t hashMethod; int8_t hashMethod;
int32_t vgNum; int32_t vgNum;
TAOS_VGROUP_HASH_INFO *vgHash; TAOS_VGROUP_HASH_INFO *vgHash;
} TAOS_DB_ROUTE_INFO ; } TAOS_DB_ROUTE_INFO;
DLL_EXPORT void taos_cleanup(void); DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
...@@ -153,6 +152,7 @@ DLL_EXPORT void taos_close(TAOS *taos); ...@@ -153,6 +152,7 @@ DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type); const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
...@@ -176,6 +176,7 @@ DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); ...@@ -176,6 +176,7 @@ DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqId);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
...@@ -207,17 +208,23 @@ DLL_EXPORT const char *taos_get_client_info(); ...@@ -207,17 +208,23 @@ DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT const char *taos_errstr(TAOS_RES *res); DLL_EXPORT const char *taos_errstr(TAOS_RES *res);
DLL_EXPORT int taos_errno(TAOS_RES *res); DLL_EXPORT int taos_errno(TAOS_RES *res);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, int64_t reqid);
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res); DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
DLL_EXPORT int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInfo); DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo);
DLL_EXPORT int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId); DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision); DLL_EXPORT TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol,
int precision, int64_t reqid);
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
int precision);
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows,
int protocol, int precision, int64_t reqid);
/* --------------------------TMQ INTERFACE------------------------------- */ /* --------------------------TMQ INTERFACE------------------------------- */
......
...@@ -271,7 +271,11 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, ...@@ -271,7 +271,11 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code); void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly); TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly); TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int64_t reqid);
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols); int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
...@@ -318,11 +322,11 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_ ...@@ -318,11 +322,11 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_
void destroyTscObj(void* pObj); void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid); STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid);
void destroyAppInst(SAppInstInfo *pAppInfo); void destroyAppInst(SAppInstInfo* pAppInfo);
uint64_t generateRequestId(); uint64_t generateRequestId();
void* createRequest(uint64_t connId, int32_t type); void* createRequest(uint64_t connId, int32_t type, int64_t reqid);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid); SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid); int32_t releaseRequest(int64_t rid);
...@@ -353,7 +357,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC ...@@ -353,7 +357,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
SRequestObj** pRequest); SRequestObj** pRequest, int64_t reqid);
void taos_close_internal(void* taos); void taos_close_internal(void* taos);
......
...@@ -101,11 +101,18 @@ typedef struct STscStmt { ...@@ -101,11 +101,18 @@ typedef struct STscStmt {
SStmtSQLInfo sql; SStmtSQLInfo sql;
SStmtExecInfo exec; SStmtExecInfo exec;
SStmtBindInfo bInfo; SStmtBindInfo bInfo;
int64_t reqid;
} STscStmt; } STscStmt;
extern char *gStmtStatusStr[]; extern char *gStmtStatusStr[];
#define STMT_LOG_SEQ(n) do { (pStmt)->seqId++; (pStmt)->seqIds[n]++; STMT_DLOG("the %dth:%d %s", (pStmt)->seqIds[n], (pStmt)->seqId, gStmtStatusStr[n]); } while (0) #define STMT_LOG_SEQ(n) \
do { \
(pStmt)->seqId++; \
(pStmt)->seqIds[n]++; \
STMT_DLOG("the %dth:%d %s", (pStmt)->seqIds[n], (pStmt)->seqId, gStmtStatusStr[n]); \
} while (0)
#define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S) #define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S)
#define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S) #define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S)
...@@ -141,7 +148,7 @@ extern char *gStmtStatusStr[]; ...@@ -141,7 +148,7 @@ extern char *gStmtStatusStr[];
#define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt) #define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt)
#define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt) #define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt)
TAOS_STMT *stmtInit(STscObj *taos); TAOS_STMT *stmtInit(STscObj *taos, int64_t reqid);
int stmtClose(TAOS_STMT *stmt); int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt);
const char *stmtErrstr(TAOS_STMT *stmt); const char *stmtErrstr(TAOS_STMT *stmt);
......
...@@ -77,19 +77,19 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -77,19 +77,19 @@ static void deregisterRequest(SRequestObj *pRequest) {
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 // tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, exec:%" PRId64 "us", // "us, exec:%" PRId64 "us",
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, // duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, // pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
// pRequest->metric.execEnd - pRequest->metric.semanticEnd); // pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
// tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 // tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64, // "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, // duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, // pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
// pRequest->metric.planEnd - pRequest->metric.semanticEnd, // pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd,
// pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId); // pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
} }
...@@ -271,7 +271,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon ...@@ -271,7 +271,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
void *createRequest(uint64_t connId, int32_t type) { void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
if (NULL == pRequest) { if (NULL == pRequest) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -286,7 +286,7 @@ void *createRequest(uint64_t connId, int32_t type) { ...@@ -286,7 +286,7 @@ void *createRequest(uint64_t connId, int32_t type) {
} }
pRequest->resType = RES_TYPE__QUERY; pRequest->resType = RES_TYPE__QUERY;
pRequest->requestId = generateRequestId(); pRequest->requestId = reqid <= 0 ? generateRequestId() : reqid;
pRequest->metric.start = taosGetTimestampUs(); pRequest->metric.start = taosGetTimestampUs();
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
......
...@@ -154,8 +154,8 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas ...@@ -154,8 +154,8 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
} }
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
SRequestObj** pRequest) { SRequestObj** pRequest, int64_t reqid) {
*pRequest = createRequest(connId, TSDB_SQL_SELECT); *pRequest = createRequest(connId, TSDB_SQL_SELECT, reqid);
if (*pRequest == NULL) { if (*pRequest == NULL) {
tscError("failed to malloc sqlObj, %s", sql); tscError("failed to malloc sqlObj, %s", sql);
return terrno; return terrno;
...@@ -1230,7 +1230,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -1230,7 +1230,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj; return pTscObj;
} }
SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT); SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT, 0);
if (pRequest == NULL) { if (pRequest == NULL) {
destroyTscObj(pTscObj); destroyTscObj(pTscObj);
return NULL; return NULL;
...@@ -2234,7 +2234,37 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, ...@@ -2234,7 +2234,37 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp,
} }
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest); int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, terrno);
return;
}
pRequest->body.queryFp = fp;
doAsyncQuery(pRequest, false);
}
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int64_t reqid) {
if (sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (fp) {
fp(param, NULL, terrno);
}
return;
}
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
fp(param, NULL, terrno);
return;
}
SRequestObj* pRequest = NULL;
int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
fp(param, NULL, terrno); fp(param, NULL, terrno);
...@@ -2261,3 +2291,20 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { ...@@ -2261,3 +2291,20 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
} }
return param->pRequest; return param->pRequest;
} }
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
tsem_wait(&param->sem);
if (param->pRequest != NULL) {
param->pRequest->syncQuery = true;
}
return param->pRequest;
}
...@@ -248,6 +248,9 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -248,6 +248,9 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
} }
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); } TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); }
TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqid) {
return taosQueryImplWithReqid(taos, sql, false, reqid);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) {
if (res == NULL) { if (res == NULL) {
...@@ -763,6 +766,11 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param ...@@ -763,6 +766,11 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
taosAsyncQueryImpl(connId, sql, fp, param, false); taosAsyncQueryImpl(connId, sql, fp, param, false);
} }
void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, int64_t reqid) {
int64_t connId = *(int64_t *)taos;
taosAsyncQueryImplWithReqid(connId, sql, fp, param, false, reqid);
}
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
const STscObj *pTscObj = pRequest->pTscObj; const STscObj *pTscObj = pRequest->pTscObj;
...@@ -992,7 +1000,7 @@ int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInf ...@@ -992,7 +1000,7 @@ int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInf
int64_t connId = *(int64_t *)taos; int64_t connId = *(int64_t *)taos;
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
char *sql = "taos_get_db_route_info"; char *sql = "taos_get_db_route_info";
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest); int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return terrno; return terrno;
...@@ -1041,7 +1049,7 @@ int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId ...@@ -1041,7 +1049,7 @@ int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId
int64_t connId = *(int64_t *)taos; int64_t connId = *(int64_t *)taos;
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
char *sql = "taos_get_table_vgId"; char *sql = "taos_get_table_vgId";
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest); int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return terrno; return terrno;
} }
...@@ -1102,7 +1110,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1102,7 +1110,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
} }
char *sql = "taos_load_table_info"; char *sql = "taos_load_table_info";
code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest); code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
goto _return; goto _return;
...@@ -1147,7 +1155,22 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) { ...@@ -1147,7 +1155,22 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
return NULL; return NULL;
} }
TAOS_STMT *pStmt = stmtInit(pObj); TAOS_STMT *pStmt = stmtInit(pObj, 0);
releaseTscObj(*(int64_t *)taos);
return pStmt;
}
TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid) {
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
tscError("invalid parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
TAOS_STMT *pStmt = stmtInit(pObj, reqid);
releaseTscObj(*(int64_t *)taos); releaseTscObj(*(int64_t *)taos);
......
...@@ -410,7 +410,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { ...@@ -410,7 +410,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVAlterTbReq vAlterTbReq = {0}; SVAlterTbReq vAlterTbReq = {0};
char* string = NULL; char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
...@@ -525,7 +525,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { ...@@ -525,7 +525,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVDropStbReq req = {0}; SVDropStbReq req = {0};
char* string = NULL; char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
...@@ -558,7 +558,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { ...@@ -558,7 +558,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
SVDropTbBatchReq req = {0}; SVDropTbBatchReq req = {0};
char* string = NULL; char* string = NULL;
cJSON* json = NULL; cJSON* json = NULL;
// decode // decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
...@@ -603,7 +603,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -603,7 +603,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -692,7 +692,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -692,7 +692,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -773,7 +773,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -773,7 +773,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL; SHashObj* pVgroupHashmap = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -926,7 +926,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -926,7 +926,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL; SHashObj* pVgroupHashmap = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -1097,7 +1097,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -1097,7 +1097,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
SArray* pArray = NULL; SArray* pArray = NULL;
SVgDataBlocks* pVgData = NULL; SVgDataBlocks* pVgData = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
...@@ -1217,7 +1217,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1217,7 +1217,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SSubmitReq* subReq = NULL; SSubmitReq* subReq = NULL;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) { if (!pRequest) {
uError("WriteRaw:createRequest error request is null"); uError("WriteRaw:createRequest error request is null");
code = terrno; code = terrno;
...@@ -1281,7 +1281,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1281,7 +1281,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t schemaLen = 0; int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
int32_t totalLen = sizeof(SSubmitReq) + submitLen; int32_t totalLen = sizeof(SSubmitReq) + submitLen;
subReq = taosMemoryCalloc(1, totalLen); subReq = taosMemoryCalloc(1, totalLen);
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq)); SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
...@@ -1407,7 +1407,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1407,7 +1407,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) { if (!pRequest) {
uError("WriteRaw:createRequest error request is null"); uError("WriteRaw:createRequest error request is null");
return terrno; return terrno;
...@@ -1674,7 +1674,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1674,7 +1674,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) { if (!pRequest) {
uError("WriteRaw:createRequest error request is null"); uError("WriteRaw:createRequest error request is null");
return terrno; return terrno;
......
...@@ -28,12 +28,12 @@ ...@@ -28,12 +28,12 @@
#define QUOTE '"' #define QUOTE '"'
#define SLASH '\\' #define SLASH '\\'
#define JUMP_SPACE(sql, sqlEnd) \ #define JUMP_SPACE(sql, sqlEnd) \
while (sql < sqlEnd) { \ while (sql < sqlEnd) { \
if (*sql == SPACE) \ if (*sql == SPACE) \
sql++; \ sql++; \
else \ else \
break; \ break; \
} }
// comma , // comma ,
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH) #define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
...@@ -353,7 +353,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, ...@@ -353,7 +353,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.numOfTags = taosArrayGetSize(pTags); pReq.numOfTags = taosArrayGetSize(pTags);
pReq.pTags = pTags; pReq.pTags = pTags;
code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest); code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -1010,9 +1010,9 @@ static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const ch ...@@ -1010,9 +1010,9 @@ static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const ch
} }
} }
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName, SHashObj *dumplicateKey, static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName,
SSmlMsgBuf *msg) { SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
if(!cols) return TSDB_CODE_OUT_OF_MEMORY; if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
const char *sql = data; const char *sql = data;
size_t childTableNameLen = strlen(tsSmlChildTableName); size_t childTableNameLen = strlen(tsSmlChildTableName);
while (sql < sqlEnd) { while (sql < sqlEnd) {
...@@ -1093,7 +1093,8 @@ static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray * ...@@ -1093,7 +1093,8 @@ static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *
} }
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>] // format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo, SArray *cols) { static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo,
SArray *cols) {
if (!sql) return TSDB_CODE_SML_INVALID_DATA; if (!sql) return TSDB_CODE_SML_INVALID_DATA;
// parse metric // parse metric
...@@ -1372,19 +1373,19 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) { ...@@ -1372,19 +1373,19 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) { static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj *s1 = *(SHashObj **)key1; SHashObj *s1 = *(SHashObj **)key1;
SHashObj *s2 = *(SHashObj **)key2; SHashObj *s2 = *(SHashObj **)key2;
SSmlKv **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN); SSmlKv **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN);
SSmlKv **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN); SSmlKv **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN);
if(!kv1pp || !kv2pp){ if (!kv1pp || !kv2pp) {
uError("smlKvTimeHashCompare kv is null"); uError("smlKvTimeHashCompare kv is null");
return -1; return -1;
} }
SSmlKv *kv1 = *kv1pp; SSmlKv *kv1 = *kv1pp;
SSmlKv *kv2 = *kv2pp; SSmlKv *kv2 = *kv2pp;
if(!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP){ if (!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP) {
uError("smlKvTimeHashCompare kv1"); uError("smlKvTimeHashCompare kv1");
return -1; return -1;
} }
if(!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP){ if (!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP) {
uError("smlKvTimeHashCompare kv2"); uError("smlKvTimeHashCompare kv2");
return -1; return -1;
} }
...@@ -1947,7 +1948,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) { ...@@ -1947,7 +1948,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
} }
static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) { static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if(!cols) return TSDB_CODE_OUT_OF_MEMORY; if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
cJSON *metricVal = cJSON_GetObjectItem(root, "value"); cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) { if (metricVal == NULL) {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
...@@ -1971,7 +1972,7 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) { ...@@ -1971,7 +1972,7 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey, static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
SSmlMsgBuf *msg) { SSmlMsgBuf *msg) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
if (!pKVs){ if (!pKVs) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
cJSON *tags = cJSON_GetObjectItem(root, "tags"); cJSON *tags = cJSON_GetObjectItem(root, "tags");
...@@ -2204,7 +2205,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { ...@@ -2204,7 +2205,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
} }
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
ret = smlParseTelnetString(info, (const char *)data, (char*)data + len, tinfo, cols); ret = smlParseTelnetString(info, (const char *)data, (char *)data + len, tinfo, cols);
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) { } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols); ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
} else { } else {
...@@ -2384,16 +2385,16 @@ static void smlPrintStatisticInfo(SSmlHandle *info) { ...@@ -2384,16 +2385,16 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime); info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
} }
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) { static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (info->protocol == TSDB_SML_JSON_PROTOCOL) { if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
if(lines){ if (lines) {
code = smlParseJSON(info, *lines); code = smlParseJSON(info, *lines);
}else if(rawLine){ } else if (rawLine) {
code = smlParseJSON(info, rawLine); code = smlParseJSON(info, rawLine);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines?*lines:rawLine); uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
return code; return code;
} }
return code; return code;
...@@ -2401,19 +2402,19 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char ...@@ -2401,19 +2402,19 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char
for (int32_t i = 0; i < numLines; ++i) { for (int32_t i = 0; i < numLines; ++i) {
char *tmp = NULL; char *tmp = NULL;
int len = 0; int len = 0;
if(lines){ if (lines) {
tmp = lines[i]; tmp = lines[i];
len = strlen(tmp); len = strlen(tmp);
}else if(rawLine){ } else if (rawLine) {
tmp = rawLine; tmp = rawLine;
while(rawLine < rawLineEnd){ while (rawLine < rawLineEnd) {
if(*(rawLine++) == '\n'){ if (*(rawLine++) == '\n') {
break; break;
} }
len++; len++;
} }
if(info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#'){ // this line is comment if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') { // this line is comment
continue; continue;
} }
} }
...@@ -2433,7 +2434,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char ...@@ -2433,7 +2434,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char
return code; return code;
} }
static int smlProcess(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) { static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t retryNum = 0; int32_t retryNum = 0;
...@@ -2532,8 +2533,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { ...@@ -2532,8 +2533,8 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
smlDestroyInfo(info); smlDestroyInfo(info);
} }
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision) { int numLines, int protocol, int precision) {
int batchs = 0; int batchs = 0;
STscObj *pTscObj = request->pTscObj; STscObj *pTscObj = request->pTscObj;
...@@ -2581,7 +2582,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char ...@@ -2581,7 +2582,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
batchs = ceil(((double)numLines) / LINE_BATCH); batchs = ceil(((double)numLines) / LINE_BATCH);
params.total = batchs; params.total = batchs;
for (int i = 0; i < batchs; ++i) { for (int i = 0; i < batchs; ++i) {
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT); SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
if (!req) { if (!req) {
request->code = TSDB_CODE_OUT_OF_MEMORY; request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null"); uError("SML:taos_schemaless_insert error request is null");
...@@ -2608,16 +2609,16 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char ...@@ -2608,16 +2609,16 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
info->pRequest->body.queryFp = smlInsertCallback; info->pRequest->body.queryFp = smlInsertCallback;
info->pRequest->body.param = info; info->pRequest->body.param = info;
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch); int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
if(lines){ if (lines) {
lines += perBatch; lines += perBatch;
} }
if(rawLine){ if (rawLine) {
int num = 0; int num = 0;
while(rawLine < rawLineEnd){ while (rawLine < rawLineEnd) {
if(*(rawLine++) == '\n'){ if (*(rawLine++) == '\n') {
num++; num++;
} }
if(num == perBatch){ if (num == perBatch) {
break; break;
} }
} }
...@@ -2628,7 +2629,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char ...@@ -2628,7 +2629,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
} }
tsem_wait(&params.sem); tsem_wait(&params.sem);
end: end:
taosThreadSpinDestroy(&params.lock); taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem); tsem_destroy(&params.sem);
// ((STscObj *)taos)->schemalessType = 0; // ((STscObj *)taos)->schemalessType = 0;
...@@ -2664,7 +2665,30 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr ...@@ -2664,7 +2665,30 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
return NULL; return NULL;
} }
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, 0);
if (!request) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
if (!lines) {
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
return (TAOS_RES *)request;
}
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
}
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
int64_t reqid) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (!request) { if (!request) {
uError("SML:taos_schemaless_insert error request is null"); uError("SML:taos_schemaless_insert error request is null");
return NULL; return NULL;
...@@ -2680,13 +2704,49 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr ...@@ -2680,13 +2704,49 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision); return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
} }
TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){ TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
int precision) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, 0);
if (!request) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
if (!lines || len <= 0) {
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
return (TAOS_RES *)request;
}
int numLines = 0;
*totalRows = 0;
char *tmp = lines;
for (int i = 0; i < len; i++) {
if (lines[i] == '\n' || i == len - 1) {
numLines++;
if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) { // ignore comment
(*totalRows)++;
}
tmp = lines + i + 1;
}
}
return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision);
}
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
int precision, int64_t reqid) {
if (NULL == taos) { if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (!request) { if (!request) {
uError("SML:taos_schemaless_insert error request is null"); uError("SML:taos_schemaless_insert error request is null");
return NULL; return NULL;
...@@ -2702,10 +2762,10 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t * ...@@ -2702,10 +2762,10 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *
int numLines = 0; int numLines = 0;
*totalRows = 0; *totalRows = 0;
char *tmp = lines; char *tmp = lines;
for(int i = 0; i < len; i++){ for (int i = 0; i < len; i++) {
if(lines[i] == '\n' || i == len - 1){ if (lines[i] == '\n' || i == len - 1) {
numLines++; numLines++;
if(tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL){ //ignore comment if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) { // ignore comment
(*totalRows)++; (*totalRows)++;
} }
tmp = lines + i + 1; tmp = lines + i + 1;
......
...@@ -5,13 +5,15 @@ ...@@ -5,13 +5,15 @@
#include "clientStmt.h" #include "clientStmt.h"
char *gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags", "fetchFields", "bind", "bindCol", "addBatch", "exec"}; char* gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags",
"fetchFields", "bind", "bindCol", "addBatch", "exec"};
static int32_t stmtCreateRequest(STscStmt* pStmt) { static int32_t stmtCreateRequest(STscStmt* pStmt) {
int32_t code = 0; int32_t code = 0;
if (pStmt->exec.pRequest == NULL) { if (pStmt->exec.pRequest == NULL) {
code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest); code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
pStmt->reqid);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pStmt->exec.pRequest->syncQuery = true; pStmt->exec.pRequest->syncQuery = true;
} }
...@@ -207,10 +209,10 @@ int32_t stmtCacheBlock(STscStmt* pStmt) { ...@@ -207,10 +209,10 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
} }
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if(!pSrc){ if (!pSrc) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
STableDataBlocks* pDst = NULL; STableDataBlocks* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc)); STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
...@@ -513,7 +515,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) { ...@@ -513,7 +515,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
TAOS_STMT* stmtInit(STscObj* taos) { TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) {
STscObj* pObj = (STscObj*)taos; STscObj* pObj = (STscObj*)taos;
STscStmt* pStmt = NULL; STscStmt* pStmt = NULL;
...@@ -533,9 +535,10 @@ TAOS_STMT* stmtInit(STscObj* taos) { ...@@ -533,9 +535,10 @@ TAOS_STMT* stmtInit(STscObj* taos) {
pStmt->taos = pObj; pStmt->taos = pObj;
pStmt->bInfo.needParse = true; pStmt->bInfo.needParse = true;
pStmt->sql.status = STMT_INIT; pStmt->sql.status = STMT_INIT;
pStmt->reqid = reqid;
STMT_LOG_SEQ(STMT_INIT); STMT_LOG_SEQ(STMT_INIT);
tscDebug("stmt:%p initialized", pStmt); tscDebug("stmt:%p initialized", pStmt);
return pStmt; return pStmt;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册