提交 b9927eae 编写于 作者: H Haojun Liao

[td-11818] support async exec task.

上级 97128cbf
......@@ -81,15 +81,27 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta;
} STableMetaOutput;
typedef int32_t __async_exec_fn_t(void* param);
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
SSchema* tGetTbnameColumnSchema();
extern void msgInit();
void msgInit();
extern int32_t qDebugFlag;
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
......
......@@ -154,7 +154,7 @@ void destroyRequest(SRequestObj* pRequest);
void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char *str);
void* openTransporter(const char *user, const char *auth);
void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
......
......@@ -102,9 +102,8 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL) {
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt);
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p;
......
#include "os.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "query.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimezone.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......@@ -44,9 +39,7 @@ void taos_cleanup(void) {
tscReqRef = -1;
taosCloseRef(id);
void* p = tscQhandle;
tscQhandle = NULL;
taosCleanUpScheduler(p);
cleanupTaskQueue();
id = tscConnRef;
tscConnRef = -1;
......
......@@ -13,17 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "taosmsg.h"
#include "query.h"
#include "clientInt.h"
#include "clientLog.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimezone.h"
......@@ -33,10 +33,8 @@
SAppInfo appInfo;
int32_t tscReqRef = -1;
int32_t tscConnRef = -1;
void *tscQhandle = NULL;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
int32_t tsNumOfThreads = 1;
volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj* pRequest) {
......@@ -98,12 +96,12 @@ void closeTransporter(STscObj* pTscObj) {
}
// TODO refactor
void* openTransporter(const char *user, const char *auth) {
void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "TSC";
rpcInit.numOfThreads = tsNumOfThreads;
rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
......@@ -229,18 +227,8 @@ void taos_init_imp(void) {
taosSetCoreDump(true);
double factor = 4.0;
int32_t numOfThreads = MAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t queueSize = tsMaxConnections * 2;
tscQhandle = taosInitScheduler(queueSize, numOfThreads, "tsc");
if (NULL == tscQhandle) {
tscError("failed to init task queue");
tscInitRes = -1;
return;
}
initTaskQueue();
tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads);
tscConnRef = taosOpenRef(200, destroyTscObj);
tscReqRef = taosOpenRef(40960, doDestroyRequest);
......
......@@ -148,3 +148,18 @@ TEST(testCase, create_db_Test) {
taos_close(pConn);
}
TEST(testCase, create_stable_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_close(pConn);
}
......@@ -67,6 +67,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
bool isDclSqlStatement(SSqlInfo* pSqlInfo);
bool isDdlSqlStatement(SSqlInfo* pSqlInfo);
#ifdef __cplusplus
}
......
......@@ -1613,7 +1613,14 @@ uint32_t convertRelationalOperator(SToken *pToken) {
}
bool isDclSqlStatement(SSqlInfo* pSqlInfo) {
return (pSqlInfo->type != TSDB_SQL_SELECT);
int32_t type = pSqlInfo->type;
return (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_CREATE_ACCT || type == TSDB_SQL_DROP_USER ||
type == TSDB_SQL_DROP_ACCT || type == TSDB_SQL_SHOW);
}
bool isDdlSqlStatement(SSqlInfo* pSqlInfo) {
int32_t type = pSqlInfo->type;
return (type == TSDB_SQL_CREATE_TABLE || type == TSDB_SQL_CREATE_DB);
}
#if 0
......
......@@ -10,3 +10,5 @@ target_link_libraries(
qcom
PRIVATE os util transport
)
ADD_SUBDIRECTORY(test)
#include "os.h"
#include "taosmsg.h"
#include "query.h"
#include "tglobal.h"
#include "tsched.h"
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
......@@ -75,4 +78,46 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
}
return true;
}
\ No newline at end of file
}
static void* pTaskQueue = NULL;
int32_t initTaskQueue() {
double factor = 4.0;
int32_t numOfThreads = MAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t queueSize = tsMaxConnections * 2;
pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc");
if (NULL == pTaskQueue) {
qError("failed to init task queue");
return -1;
}
qDebug("task queue is initialized, numOfThreads: %d", numOfThreads);
}
int32_t cleanupTaskQueue() {
taosCleanUpScheduler(pTaskQueue);
}
static void execHelper(struct SSchedMsg* pSchedMsg) {
assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL);
__async_exec_fn_t* execFn = (__async_exec_fn_t*) pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) {
*(int32_t*) pSchedMsg->msg = code;
}
}
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
assert(execFn != NULL);
SSchedMsg schedMsg = {0};
schedMsg.fp = execHelper;
schedMsg.ahandle = execFn;
schedMsg.thandle = execParam;
schedMsg.msg = code;
taosScheduleTask(pTaskQueue, &schedMsg);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册