提交 f9133cc7 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

#!/bin/bash
rm -rf 127.0.0.1*
rm -rf ./data
#ifndef TDENGINE_COMMON_H
#define TDENGINE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#define MAX_PEERS 10
#define COMMAND_LEN 512
#define TOKEN_LEN 128
#define DIR_LEN 256
#define HOST_LEN 64
#define ADDRESS_LEN (HOST_LEN + 16)
typedef struct {
char host[HOST_LEN];
uint32_t port;
} Addr;
typedef struct {
Addr me;
Addr peers[MAX_PEERS];
int peersCount;
char dir[DIR_LEN];
char dataDir[DIR_LEN + HOST_LEN * 2];
} SRaftServerConfig;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_COMMON_H
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <assert.h>
#include <getopt.h>
#include <time.h>
#include <stdlib.h>
#include <getopt.h>
#include <raft.h>
#include <raft/uv.h>
#include "raftServer.h"
#include "common.h"
const char *exe_name;
void parseAddr(const char *addr, char *host, int len, uint32_t *port) {
char* tmp = (char*)malloc(strlen(addr) + 1);
strcpy(tmp, addr);
char* context;
char* separator = ":";
char* token = strtok_r(tmp, separator, &context);
if (token) {
snprintf(host, len, "%s", token);
}
token = strtok_r(NULL, separator, &context);
if (token) {
sscanf(token, "%u", port);
}
free(tmp);
}
// only parse 3 tokens
int parseCommand(const char* str, char* token1, char* token2, char* token3, int len)
{
char* tmp = (char*)malloc(strlen(str) + 1);
strcpy(tmp, str);
char* context;
char* separator = " ";
int n = 0;
char* token = strtok_r(tmp, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token1, token, len);
n++;
}
token = strtok_r(NULL, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token2, token, len);
n++;
}
token = strtok_r(NULL, separator, &context);
if (!token) {
goto ret;
}
if (strcmp(token, "") != 0) {
strncpy(token3, token, len);
n++;
}
ret:
return n;
free(tmp);
}
void *startServerFunc(void *param) {
SRaftServer *pServer = (SRaftServer*)param;
int32_t r = raftServerStart(pServer);
assert(r == 0);
return NULL;
}
// Console ---------------------------------
const char* state2String(unsigned short state) {
if (state == RAFT_UNAVAILABLE) {
return "RAFT_UNAVAILABLE";
} else if (state == RAFT_FOLLOWER) {
return "RAFT_FOLLOWER";
} else if (state == RAFT_CANDIDATE) {
return "RAFT_CANDIDATE";
} else if (state == RAFT_LEADER) {
return "RAFT_LEADER";
}
return "UNKNOWN_RAFT_STATE";
}
void printRaftConfiguration(struct raft_configuration *c) {
printf("configuration: \n");
for (int i = 0; i < c->n; ++i) {
printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address);
}
}
void printRaftState(struct raft *r) {
printf("----Raft State: -----------\n");
printf("my_id: %llu \n", r->id);
printf("address: %s \n", r->address);
printf("current_term: %llu \n", r->current_term);
printf("voted_for: %llu \n", r->voted_for);
printf("role: %s \n", state2String(r->state));
printf("commit_index: %llu \n", r->commit_index);
printf("last_applied: %llu \n", r->last_applied);
printf("last_stored: %llu \n", r->last_stored);
/*
printf("configuration_index: %llu \n", r->configuration_index);
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
printRaftConfiguration(&r->configuration);
*/
printf("----------------------------\n");
}
void putValueCb(struct raft_apply *req, int status, void *result) {
raft_free(req);
struct raft *r = req->data;
if (status != 0) {
printf("putValueCb: %s \n", raft_errmsg(r));
} else {
printf("putValueCb: %s \n", "ok");
}
}
void putValue(struct raft *r, const char *value) {
struct raft_buffer buf;
buf.len = TOKEN_LEN;;
buf.base = raft_malloc(buf.len);
snprintf(buf.base, buf.len, "%s", value);
struct raft_apply *req = raft_malloc(sizeof(struct raft_apply));
req->data = r;
int ret = raft_apply(r, req, &buf, 1, putValueCb);
if (ret == 0) {
printf("put %s \n", (char*)buf.base);
} else {
printf("put error: %s \n", raft_errmsg(r));
}
}
void getValue(const char *key) {
char *ptr = getKV(key);
if (ptr) {
printf("get value: [%s] \n", ptr);
} else {
printf("value not found for key: [%s] \n", key);
}
}
void console(SRaftServer *pRaftServer) {
while (1) {
char cmd_buf[COMMAND_LEN];
memset(cmd_buf, 0, sizeof(cmd_buf));
char *ret = fgets(cmd_buf, COMMAND_LEN, stdin);
if (!ret) {
exit(-1);
}
int pos = strlen(cmd_buf);
if(cmd_buf[pos - 1] == '\n') {
cmd_buf[pos - 1] = '\0';
}
if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) {
continue;
}
char cmd[TOKEN_LEN];
memset(cmd, 0, sizeof(cmd));
char param1[TOKEN_LEN];
memset(param1, 0, sizeof(param1));
char param2[TOKEN_LEN];
memset(param2, 0, sizeof(param2));
parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN);
if (strcmp(cmd, "addnode") == 0) {
printf("not support \n");
/*
char host[HOST_LEN];
uint32_t port;
parseAddr(param1, host, HOST_LEN, &port);
uint64_t rid = raftId(host, port);
struct raft_change *req = raft_malloc(sizeof(*req));
int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL);
if (r != 0) {
printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft));
}
printf("add node: %lu %s \n", rid, param1);
struct raft_change *req2 = raft_malloc(sizeof(*req2));
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL);
if (r != 0) {
printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft));
}
*/
} else if (strcmp(cmd, "dropnode") == 0) {
printf("not support \n");
} else if (strcmp(cmd, "put") == 0) {
char buf[256];
snprintf(buf, sizeof(buf), "%s--%s", param1, param2);
putValue(&pRaftServer->raft, buf);
} else if (strcmp(cmd, "get") == 0) {
getValue(param1);
} else if (strcmp(cmd, "state") == 0) {
printRaftState(&pRaftServer->raft);
} else if (strcmp(cmd, "snapshot") == 0) {
printf("not support \n");
} else if (strcmp(cmd, "help") == 0) {
printf("addnode \"127.0.0.1:8888\" \n");
printf("dropnode \"127.0.0.1:8888\" \n");
printf("put key value \n");
printf("get key \n");
printf("state \n");
} else {
printf("unknown command: [%s], type \"help\" to see help \n", cmd);
}
//printf("cmd_buf: [%s] \n", cmd_buf);
}
}
void *startConsoleFunc(void *param) {
SRaftServer *pServer = (SRaftServer*)param;
console(pServer);
return NULL;
}
// Config ---------------------------------
void usage() {
printf("\nusage: \n");
printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name);
printf("\n");
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name);
printf("\n");
}
void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
memset(pConf, 0, sizeof(*pConf));
int option_index, option_value;
option_index = 0;
static struct option long_options[] = {
{"help", no_argument, NULL, 'h'},
{"peers", required_argument, NULL, 'p'},
{"me", required_argument, NULL, 'm'},
{"dir", required_argument, NULL, 'd'},
{NULL, 0, NULL, 0}
};
while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) {
switch (option_value) {
case 'm': {
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
break;
}
case 'p': {
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
int peerCount = splitString(optarg, ",", tokens, MAX_PEERS);
pConf->peersCount = peerCount;
for (int i = 0; i < peerCount; ++i) {
Addr *pAddr = &pConf->peers[i];
parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port);
}
break;
}
case 'd': {
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
break;
}
case 'h': {
usage();
exit(-1);
}
default: {
usage();
exit(-1);
}
}
}
snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s:%u", pConf->dir, pConf->me.host, pConf->me.port);
}
void printConf(SRaftServerConfig *pConf) {
printf("\nconf: \n");
printf("me: %s:%u \n", pConf->me.host, pConf->me.port);
printf("peersCount: %d \n", pConf->peersCount);
for (int i = 0; i < pConf->peersCount; ++i) {
Addr *pAddr = &pConf->peers[i];
printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port);
}
printf("dataDir: %s \n\n", pConf->dataDir);
}
int main(int argc, char **argv) {
srand(time(NULL));
int32_t ret;
exe_name = argv[0];
if (argc < 3) {
usage();
exit(-1);
}
SRaftServerConfig conf;
parseConf(argc, argv, &conf);
printConf(&conf);
char cmd_buf[COMMAND_LEN];
snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir);
system(cmd_buf);
struct raft_fsm fsm;
initFsm(&fsm);
SRaftServer raftServer;
ret = raftServerInit(&raftServer, &conf, &fsm);
assert(ret == 0);
pthread_t tidRaftServer;
pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer);
pthread_t tidConsole;
pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer);
while (1) {
sleep(10);
}
return 0;
}
#include <stdlib.h>
#include "common.h"
#include "raftServer.h"
char *keys;
char *values;
void initStore() {
keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
writeIndex = 0;
}
void destroyStore() {
free(keys);
free(values);
}
void putKV(const char *key, const char *value) {
if (writeIndex < MAX_RECORD_COUNT) {
strncpy(&keys[writeIndex], key, MAX_KV_LEN);
strncpy(&values[writeIndex], value, MAX_KV_LEN);
writeIndex++;
}
}
char *getKV(const char *key) {
for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
if (strcmp(&keys[i], key) == 0) {
return &values[i];
}
}
return NULL;
}
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr)
{
if (n_arr <= 0) {
return -1;
}
char* tmp = (char*)malloc(strlen(str) + 1);
strcpy(tmp, str);
char* context;
int n = 0;
char* token = strtok_r(tmp, separator, &context);
if (!token) {
goto ret;
}
strncpy(arr[n], token, MAX_TOKEN_LEN);
n++;
while (1) {
token = strtok_r(NULL, separator, &context);
if (!token || n >= n_arr) {
goto ret;
}
strncpy(arr[n], token, MAX_TOKEN_LEN);
n++;
}
ret:
free(tmp);
return n;
}
uint64_t raftId(const char *host, uint32_t port) {
uint32_t host_uint32 = (uint32_t)inet_addr(host);
assert(host_uint32 != (uint32_t)-1);
uint64_t code = ((uint64_t)host_uint32) << 32 | port;
return code;
}
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) {
int ret;
snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
pRaftServer->port = pConf->me.port;
snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir));
pRaftServer->raftId = raftId(pRaftServer->host, pRaftServer->port);
pRaftServer->fsm = pFsm;
ret = uv_loop_init(&pRaftServer->loop);
if (!ret) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
}
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
if (!ret) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
}
ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport);
if (!ret) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
}
ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address);
if (!ret) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
}
struct raft_configuration conf;
raft_configuration_init(&conf);
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address);
for (int i = 0; i < pConf->peersCount; ++i) {
const Addr *pAddr = &pConf->peers[i];
raft_id rid = raftId(pAddr->host, pAddr->port);
char addrBuf[ADDRESS_LEN];
snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port);
raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER);
printf("add peers: %llu - %s \n", rid, addrBuf);
}
raft_bootstrap(&pRaftServer->raft, &conf);
return 0;
}
int32_t raftServerStart(SRaftServer *pRaftServer) {
int ret;
ret = raft_start(&pRaftServer->raft);
if (!ret) {
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
}
uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
}
void raftServerClose(SRaftServer *pRaftServer) {
}
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
char *msg = (char*)buf->base;
printf("fsm apply: %s \n", msg);
char arr[2][MAX_TOKEN_LEN];
splitString(msg, "--", arr, 2);
putKV(arr[0], arr[1]);
return 0;
}
int32_t initFsm(struct raft_fsm *fsm) {
initStore();
fsm->apply = fsmApplyCb;
return 0;
}
#ifndef TDENGINE_RAFT_SERVER_H
#define TDENGINE_RAFT_SERVER_H
#ifdef __cplusplus
extern "C" {
#endif
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <string.h>
#include "raft.h"
#include "raft/uv.h"
#include "common.h"
// simulate a db store, just for test
#define MAX_KV_LEN 100
#define MAX_RECORD_COUNT 500
char *keys;
char *values;
int writeIndex;
void initStore();
void destroyStore();
void putKV(const char *key, const char *value);
char *getKV(const char *key);
typedef struct {
char dir[DIR_LEN + HOST_LEN * 2]; /* Data dir of UV I/O backend */
char host[HOST_LEN];
uint32_t port;
char address[ADDRESS_LEN]; /* Raft instance address */
raft_id raftId; /* For vote */
struct raft_fsm *fsm; /* Sample application FSM */
struct raft raft; /* Raft instance */
struct raft_io io; /* UV I/O backend */
struct uv_loop_s loop; /* UV loop */
struct raft_uv_transport transport; /* UV I/O backend transport */
} SRaftServer;
#define MAX_TOKEN_LEN 32
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr);
uint64_t raftId(const char *host, uint32_t port);
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm);
int32_t raftServerStart(SRaftServer *pRaftServer);
void raftServerClose(SRaftServer *pRaftServer);
int initFsm(struct raft_fsm *fsm);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RAFT_SERVER_H
......@@ -52,6 +52,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TASKS_STATUS, "tasks-status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CANCEL_TASK, "cancel-task" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TASK, "drop-task" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
......@@ -308,17 +312,25 @@ typedef struct {
char data[];
} SMDCreateTableMsg;
//typedef struct {
// int32_t len; // one create table message
// char tableName[TSDB_TABLE_FNAME_LEN];
// int16_t numOfColumns;
// int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
// int8_t igExists;
// int8_t rspMeta;
// int8_t reserved[16];
// char schema[];
//} SCreateTableMsg;
typedef struct {
int32_t len; // one create table message
char tableName[TSDB_TABLE_FNAME_LEN];
int16_t numOfTags;
int16_t numOfColumns;
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int16_t numOfTags;
int8_t igExists;
int8_t rspMeta;
int8_t reserved[16];
char schema[];
} SCreateTableMsg;
} SCreateCTableMsg;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......@@ -326,7 +338,7 @@ typedef struct {
int32_t numOfTags;
int32_t numOfColumns;
SSchema pSchema[];
} SCreateStbMsg;
} SCreateStbMsg, SCreateTableMsg;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......@@ -369,6 +381,7 @@ typedef struct {
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SDropTableMsg;
typedef struct {
......@@ -1094,29 +1107,29 @@ typedef struct {
/* data */
} SUpdateTagValRsp;
typedef struct SSchedulerQueryMsg {
typedef struct SSubQueryMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
uint32_t contentLen;
char msg[];
} SSchedulerQueryMsg;
} SSubQueryMsg;
typedef struct SSchedulerReadyMsg {
typedef struct SResReadyMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerReadyMsg;
} SResReadyMsg;
typedef struct SSchedulerFetchMsg {
typedef struct SResFetchMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerFetchMsg;
} SResFetchMsg;
typedef struct SSchedulerStatusMsg {
typedef struct SSchTasksStatusMsg {
uint64_t schedulerId;
} SSchedulerStatusMsg;
} SSchTasksStatusMsg;
typedef struct STaskStatus {
uint64_t queryId;
......@@ -1130,11 +1143,17 @@ typedef struct SSchedulerStatusRsp {
} SSchedulerStatusRsp;
typedef struct SSchedulerCancelMsg {
typedef struct STaskCancelMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} STaskCancelMsg;
typedef struct STaskDropMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerCancelMsg;
} STaskDropMsg;
#pragma pack(pop)
......
......@@ -115,14 +115,14 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
/**
* Get a table's vgroup from its name's hash value.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param pTableName (input, table name, NOT including db name)
* @param vgInfo (output, vgroup info)
* @return error code
*/
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo);
int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo);
/**
......
......@@ -40,7 +40,7 @@ typedef struct SQueryNode {
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int16_t bytes;
int32_t bytes;
} SField;
typedef struct SParseBasicCtx {
......@@ -160,6 +160,13 @@ typedef struct SInsertStmtInfo {
const char* sql; // current sql statement position
} SInsertStmtInfo;
typedef struct SDclStmtInfo {
int16_t nodeType;
int16_t msgType;
char* pMsg;
int32_t msgLen;
} SDclStmtInfo;
#ifdef __cplusplus
}
#endif
......
......@@ -22,20 +22,11 @@ extern "C" {
#include "parsenodes.h"
/**
* True will be returned if the input sql string is insert, false otherwise.
* @param pStr sql string
* @param length length of the sql string
* @return
*/
bool qIsInsertSql(const char* pStr, size_t length);
typedef struct SParseContext {
SParseBasicCtx ctx;
void *pRpc;
struct SCatalog *pCatalog;
const SEpSet *pEpSet;
int64_t id; // query id, generated by uuid generator
int8_t schemaAttached; // denote if submit block is built with table schema or not
const char *pSql; // sql string
size_t sqlLen; // length of the sql string
......@@ -51,17 +42,9 @@ typedef struct SParseContext {
* @param msg extended error message if exists.
* @return error code
*/
int32_t qParseQuerySql(const char* pStr, size_t length, SParseBasicCtx* pParseCtx, int32_t* type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen);
int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
/**
* Parse the insert sql statement.
* @param pStr sql string
* @param length length of the sql string
* @param id operator id, generated by uuid generator.
* @param msg extended error message if exists to help avoid the problem in sql statement.
* @return data in binary format to submit to vnode directly.
*/
int32_t qParseInsertSql(SParseContext* pContext, struct SInsertStmtInfo** pInfo);
bool qIsDclQuery(const SQueryNode* pQuery);
/**
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
......
......@@ -25,12 +25,15 @@ extern "C" {
#include "tlog.h"
enum {
JOB_TASK_STATUS_NULL = 0,
JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING,
JOB_TASK_STATUS_PARTIAL_SUCCEED,
JOB_TASK_STATUS_SUCCEED,
JOB_TASK_STATUS_FAILED,
JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED
JOB_TASK_STATUS_CANCELLED,
JOB_TASK_STATUS_DROPPING,
};
typedef struct STableComInfo {
......@@ -107,7 +110,7 @@ int32_t cleanupTaskQueue();
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
SSchema* tGetTbnameColumnSchema();
void msgInit();
void initQueryModuleMsgHandle();
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
......
......@@ -42,15 +42,17 @@ typedef struct {
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp);
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt);
......
......@@ -314,6 +314,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error")
#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input")
#define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist")
#define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist")
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
// grant
......
......@@ -33,6 +33,8 @@ typedef void (*_hash_free_fn_t)(void *);
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define HASH_NODE_EXIST(code) (code == -2)
/**
* murmur hash algorithm
* @key usually string
......
......@@ -137,15 +137,14 @@ typedef struct SRequestMsgBody {
extern SAppInfo appInfo;
extern int32_t tscReqRef;
extern void *tscQhandle;
extern int32_t tscConnRef;
extern int (*buildRequestMsgFp[TSDB_SQL_MAX])(SRequestObj *pRequest, SRequestMsgBody *pMsgBody);
extern int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest);
extern int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
int taos_init();
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo);
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj);
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
......
......@@ -13,11 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "query.h"
#include <catalog.h>
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "query.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
......@@ -129,7 +130,7 @@ void destroyTscObj(void *pObj) {
tfree(pTscObj);
}
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo) {
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo) {
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -144,6 +145,10 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t
tstrncpy(pObj->user, user, sizeof(pObj->user));
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
if (db != NULL) {
tstrncpy(pObj->db, db, tListLen(pObj->db));
}
pthread_mutex_init(&pObj->mutex, NULL);
pObj->id = taosAddRef(tscConnRef, pObj);
......@@ -220,9 +225,13 @@ void taos_init_imp(void) {
taosInitNotes();
initMsgHandleFp();
initQueryModuleMsgHandle();
rpcInit();
SCatalogCfg cfg = {.enableVgroupCache = true, .maxDBCacheNum = 100, .maxTblCacheNum = 100};
catalogInit(&cfg);
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
taosSetCoreDump(true);
......
......@@ -144,37 +144,66 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
tscDebugL("0x%"PRIx64" SQL: %s", pRequest->requestId, pRequest->sqlstr);
int32_t code = 0;
if (qIsInsertSql(pRequest->sqlstr, sqlLen)) {
// todo add
} else {
int32_t type = 0;
void* output = NULL;
int32_t outputLen = 0;
SParseContext cxt = {
.ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)},
.pSql = pRequest->sqlstr,
.sqlLen = sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
};
SQueryNode* pQuery = NULL;
int32_t code = qParseQuerySql(&cxt, &pQuery);
if (qIsDclQuery(pQuery)) {
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
pRequest->type = pDcl->msgType;
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};
SRequestMsgBody body = buildRequestMsgImpl(pRequest);
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) {
struct SCatalog* pCatalog = NULL;
char buf[12] = {0};
sprintf(buf, "%d", pTscObj->pAppInfo->clusterId);
code = catalogGetHandle(buf, &pCatalog);
if (code != 0) {
pRequest->code = code;
return pRequest;
}
SCreateTableMsg* pMsg = body.msgInfo.pMsg;
SParseBasicCtx c = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)};
code = qParseQuerySql(pRequest->sqlstr, sqlLen, &c, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE);
if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW || type == TSDB_SQL_DROP_USER ||
type == TSDB_SQL_DROP_ACCT || type == TSDB_SQL_CREATE_DB || type == TSDB_SQL_CREATE_ACCT ||
type == TSDB_SQL_CREATE_TABLE || type == TSDB_SQL_USE_DB) {
pRequest->type = type;
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = output, .len = outputLen};
SName t = {0};
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
SRequestMsgBody body = {0};
buildRequestMsgFp[type](pRequest, &body);
char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
tNameGetFullDbName(&t, db);
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCatalog, pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
SEpSet ep = {0};
ep.inUse = info.inUse;
ep.numOfEps = info.numOfEps;
for(int32_t i = 0; i < ep.numOfEps; ++i) {
ep.port[i] = info.epAddr[i].port;
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
}
tsem_wait(&pRequest->body.rspSem);
destroyRequestMsgBody(&body);
sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId);
} else {
assert(0);
int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId);
}
tfree(c.db);
tsem_wait(&pRequest->body.rspSem);
destroyRequestMsgBody(&body);
}
tfree(cxt.ctx.db);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
return pRequest;
......@@ -220,13 +249,13 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
}
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
STscObj *pTscObj = createTscObj(user, auth, ip, port, pAppInfo);
STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pTscObj;
}
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_MSG_TYPE_CONNECT);
if (pRequest == NULL) {
destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -268,16 +297,11 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
return -1;
}
// TODO refactor full_name
char *db; // ugly code to move the space
STscObj *pObj = pRequest->pTscObj;
pthread_mutex_lock(&pObj->mutex);
db = strstr(pObj->db, TS_PATH_DELIMITER);
db = (db == NULL) ? pObj->db : db + 1;
char* db = getConnectionDB(pObj);
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
pthread_mutex_unlock(&pObj->mutex);
tfree(db);
pConnect->pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime);
......@@ -395,10 +419,9 @@ void* doFetchRow(SRequestObj* pRequest) {
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
pRequest->type = TSDB_SQL_RETRIEVE_MNODE;
pRequest->type = TSDB_MSG_TYPE_SHOW_RETRIEVE;
SRequestMsgBody body = {0};
buildRequestMsgFp[pRequest->type](pRequest, &body);
SRequestMsgBody body = buildRequestMsgImpl(pRequest);
int64_t transporterId = 0;
STscObj* pTscObj = pRequest->pTscObj;
......
......@@ -21,8 +21,7 @@
#include "tmsgtype.h"
#include "trpc.h"
int (*buildRequestMsgFp[TSDB_SQL_MAX])(SRequestObj *pRequest, SRequestMsgBody *pMsgBody) = {0};
int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
int (*handleRequestRspFp[TSDB_MSG_TYPE_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
......@@ -67,15 +66,6 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
pConnect->epSet.port[i] = htons(pConnect->epSet.port[i]);
}
// TODO refactor
pthread_mutex_lock(&pTscObj->mutex);
char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};
int32_t len = sprintf(temp, "%d%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pTscObj->db);
assert(len <= sizeof(pTscObj->db));
tstrncpy(pTscObj->db, temp, sizeof(pTscObj->db));
pthread_mutex_unlock(&pTscObj->mutex);
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
}
......@@ -96,47 +86,35 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
return 0;
}
int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg);
pMsgBody->requestObjRefId = pRequest->self;
pMsgBody->msgInfo = pRequest->body.requestMsg;
switch(pRequest->type) {
case TSDB_SQL_CREATE_USER:
pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_USER;
break;
case TSDB_SQL_DROP_USER:
pMsgBody->msgType = TSDB_MSG_TYPE_DROP_USER;
break;
case TSDB_SQL_CREATE_ACCT:
pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_ACCT;
break;
case TSDB_SQL_DROP_ACCT:
pMsgBody->msgType = TSDB_MSG_TYPE_DROP_ACCT;
break;
case TSDB_SQL_CREATE_DB: {
pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_DB;
SCreateDbMsg* pCreateMsg = pRequest->body.requestMsg.pMsg;
SName name = {0};
int32_t ret = tNameSetDbName(&name, pRequest->pTscObj->acctId, pCreateMsg->db, strnlen(pCreateMsg->db, tListLen(pCreateMsg->db)));
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
tNameGetFullDbName(&name, pCreateMsg->db);
break;
}
case TSDB_SQL_USE_DB: {
pMsgBody->msgType = TSDB_MSG_TYPE_USE_DB;
break;
}
case TSDB_SQL_CREATE_TABLE: {
pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_STB;
break;
}
case TSDB_SQL_SHOW:
pMsgBody->msgType = TSDB_MSG_TYPE_SHOW;
break;
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
if (pRetrieveMsg == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgBody->msgInfo.pMsg = pRetrieveMsg;
return TSDB_CODE_SUCCESS;
}
SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest) {
if (pRequest->type == TSDB_MSG_TYPE_SHOW_RETRIEVE) {
SRequestMsgBody body = {0};
buildRetrieveMnodeMsg(pRequest, &body);
return body;
} else {
assert(pRequest != NULL);
SRequestMsgBody body = {
.requestObjRefId = pRequest->self,
.msgInfo = pRequest->body.requestMsg,
.msgType = pRequest->type,
.requestId = pRequest->requestId,
};
return body;
}
}
......@@ -175,18 +153,6 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
return 0;
}
int buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg);
pMsgBody->requestObjRefId = pRequest->self;
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgBody->msgInfo.pMsg = pRetrieveMsg;
return TSDB_CODE_SUCCESS;
}
int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
assert(msgLen >= sizeof(SRetrieveTableRsp));
......@@ -227,6 +193,10 @@ int32_t processCreateTableRsp(SRequestObj *pRequest, const char* pMsg, int32_t m
assert(pMsg != NULL);
}
int32_t processDropDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
// todo: Remove cache in catalog cache.
}
void initMsgHandleFp() {
#if 0
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
......@@ -303,27 +273,11 @@ void initMsgHandleFp() {
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif
buildRequestMsgFp[TSDB_SQL_CONNECT] = buildConnectMsg;
handleRequestRspFp[TSDB_SQL_CONNECT] = processConnectRsp;
buildRequestMsgFp[TSDB_SQL_CREATE_USER] = doBuildMsgSupp;
buildRequestMsgFp[TSDB_SQL_DROP_USER] = doBuildMsgSupp;
buildRequestMsgFp[TSDB_SQL_CREATE_ACCT] = doBuildMsgSupp;
buildRequestMsgFp[TSDB_SQL_DROP_ACCT] = doBuildMsgSupp;
buildRequestMsgFp[TSDB_SQL_SHOW] = doBuildMsgSupp;
handleRequestRspFp[TSDB_SQL_SHOW] = processShowRsp;
buildRequestMsgFp[TSDB_SQL_RETRIEVE_MNODE] = buildRetrieveMnodeMsg;
handleRequestRspFp[TSDB_SQL_RETRIEVE_MNODE]= processRetrieveMnodeRsp;
buildRequestMsgFp[TSDB_SQL_CREATE_DB] = doBuildMsgSupp;
handleRequestRspFp[TSDB_SQL_CREATE_DB] = processCreateDbRsp;
buildRequestMsgFp[TSDB_SQL_USE_DB] = doBuildMsgSupp;
handleRequestRspFp[TSDB_SQL_USE_DB] = processUseDbRsp;
buildRequestMsgFp[TSDB_SQL_CREATE_TABLE] = doBuildMsgSupp;
handleRequestRspFp[TSDB_SQL_CREATE_TABLE] = processCreateTableRsp;
handleRequestRspFp[TSDB_MSG_TYPE_CONNECT] = processConnectRsp;
handleRequestRspFp[TSDB_MSG_TYPE_SHOW] = processShowRsp;
handleRequestRspFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = processRetrieveMnodeRsp;
handleRequestRspFp[TSDB_MSG_TYPE_CREATE_DB] = processCreateDbRsp;
handleRequestRspFp[TSDB_MSG_TYPE_USE_DB] = processUseDbRsp;
handleRequestRspFp[TSDB_MSG_TYPE_CREATE_TABLE] = processCreateTableRsp;
handleRequestRspFp[TSDB_MSG_TYPE_DROP_DB] = processDropDbRsp;
}
\ No newline at end of file
......@@ -22,11 +22,24 @@
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "../inc/clientInt.h"
#include "taos.h"
#include "tglobal.h"
#include "../inc/clientInt.h"
namespace {
void showDB(TAOS* pConn) {
TAOS_RES* pRes = taos_query(pConn, "show databases");
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
}
} // namespace
int main(int argc, char** argv) {
......@@ -34,92 +47,208 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
taos_init();
}
TEST(testCase, driverInit_Test) { taos_init(); }
TEST(testCase, connect_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// TEST(testCase, connect_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
// taos_close(pConn);
//}
//
// TEST(testCase, create_user_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
taos_close(pConn);
}
TEST(testCase, create_user_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
//
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// TEST(testCase, create_account_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// TEST(testCase, drop_account_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// TEST(testCase, show_user_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show users");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_close(pConn);
//}
//
// TEST(testCase, drop_user_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// TEST(testCase, show_db_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show databases");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_close(pConn);
//}
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_account_Test) {
TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_account_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_user_Test) {
TEST(testCase, use_db_test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show users");
TAOS_ROW pRow = NULL;
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_TRUE(pFields == NULL);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_close(pConn);
}
TEST(testCase, drop_user_Test) {
TEST(testCase, drop_db_test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
showDB(pConn);
TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
showDB(pConn);
taos_close(pConn);
}
TEST(testCase, show_db_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// TEST(testCase, create_stable_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST(testCase, create_table_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
// taos_free_result(pRes);
//
// taos_close(pConn);
}
TAOS_RES* pRes = taos_query(pConn, "show databases");
TEST(testCase, create_ctable_Test) {}
TEST(testCase, show_stable_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show stables");
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
......@@ -131,53 +260,45 @@ TEST(testCase, show_db_Test) {
printf("%s\n", str);
}
taos_close(pConn);
}
TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_close(pConn);
}
TEST(testCase, use_db_test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_stable_Test) {
TEST(testCase, drop_stable_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("error in creating db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in using db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
pRes = taos_query(pConn, "drop stable st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop stable, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, show_table_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show tables");
// taos_free_result(pRes);
//
// taos_close(pConn);
//}
......@@ -45,6 +45,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET_CUR] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_RES_READY] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TASKS_STATUS] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CANCEL_TASK] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TASK] = dndProcessVnodeFetchMsg;
// msg from client to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg;
......
......@@ -59,8 +59,6 @@ typedef struct SCatalogMgmt {
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
extern int32_t ctgDebugFlag;
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0)
......@@ -75,7 +73,6 @@ extern int32_t ctgDebugFlag;
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus
}
#endif
......
......@@ -371,7 +371,7 @@ int32_t catalogInit(SCatalogCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle) {
if (NULL == clusterId || NULL == catalogHandle) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
......@@ -565,12 +565,12 @@ _return:
}
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
SDBVgroupInfo dbInfo = {0};
int32_t code = 0;
int32_t vgId = 0;
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
......
......@@ -29,6 +29,8 @@
#include "catalog.h"
#include "tep.h"
#include "trpc.h"
#include "stub.h"
#include "addr_any.h"
typedef struct SAppInstInfo {
int64_t numOfConns;
......@@ -86,6 +88,27 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
ASSERT_EQ(rpcRsp.code, 0);
}
void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; //todo
return;
}
void initTestEnv() {
static Stub stub;
stub.set(rpcSendRecv, __rpcSendRecv);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, __rpcSendRecv);
}
}
}
}
TEST(testCase, normalCase) {
......@@ -99,7 +122,7 @@ TEST(testCase, normalCase) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
msgInit();
initQueryModuleMsgHandle();
sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
......
......@@ -42,7 +42,12 @@ typedef struct TFileHeader {
#define TFILE_HEADER_SIZE (sizeof(TFileHeader))
#define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t))
//#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
typedef struct TFileValue {
char* colVal; // null terminated
SArray* tableId;
int32_t offset;
} TFileValue;
typedef struct TFileCacheKey {
uint64_t suid;
......
......@@ -41,7 +41,7 @@ static pthread_once_t isInit = PTHREAD_ONCE_INIT;
static void indexInit();
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
static int indexMergeCacheIntoTindex(SIndex* sIdx);
static int indexFlushCacheToTindex(SIndex* sIdx);
static void indexInterResultsDestroy(SArray* results);
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
......@@ -49,9 +49,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) {
return -1;
}
if (sIdx == NULL) { return -1; }
#ifdef USE_LUCENE
index_t* index = index_open(path);
......@@ -131,9 +129,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
int32_t colId = fi->colId;
int32_t version = index->cVersion;
int ret = indexCachePut(index->cache, p, colId, version, uid);
if (ret != 0) {
return ret;
}
if (ret != 0) { return ret; }
}
#endif
......@@ -221,9 +217,7 @@ void indexOptsDestroy(SIndexOpts* opts){
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) {
return NULL;
}
if (p == NULL) { return NULL; }
p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p;
......@@ -250,9 +244,7 @@ SIndexTerm* indexTermCreate(int64_t suid,
const char* colVal,
int32_t nColVal) {
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
if (t == NULL) {
return NULL;
}
if (t == NULL) { return NULL; }
t->suid = suid;
t->operType = oper;
......@@ -332,9 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
return 0;
}
static void indexInterResultsDestroy(SArray* results) {
if (results == NULL) {
return;
}
if (results == NULL) { return; }
size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) {
......@@ -363,10 +353,10 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
}
return 0;
}
static int indexMergeCacheIntoTindex(SIndex* sIdx) {
if (sIdx == NULL) {
return -1;
}
static int indexFlushCacheToTindex(SIndex* sIdx) {
if (sIdx == NULL) { return -1; }
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
return 0;
}
......@@ -151,8 +151,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t
EIndexQueryType qtype = query->qType;
int32_t keyLen = CACHE_KEY_LEN(term);
char* buf = calloc(1, keyLen);
char* buf = calloc(1, keyLen);
if (qtype == QUERY_TERM) {
//
} else if (qtype == QUERY_PREFIX) {
......
......@@ -69,9 +69,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
// ugly code, refactor later
ctx->file.readOnly = readOnly;
if (readOnly == false) {
ctx->file.fd = tfOpenCreateWriteAppend(tmpFile);
ctx->file.fd = tfOpenCreateWriteAppend(path);
} else {
ctx->file.fd = tfOpenReadWrite(tmpFile);
ctx->file.fd = tfOpenReadWrite(path);
}
if (ctx->file.fd < 0) {
indexError("open file error %d", errno);
......@@ -93,6 +93,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
END:
if (ctx->type == TMemory) { free(ctx->mem.buf); }
free(ctx);
return NULL;
}
void writerCtxDestroy(WriterCtx* ctx) {
if (ctx->type == TMemory) {
......
......@@ -25,12 +25,7 @@
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))
typedef struct TFileValue {
char* colVal; // null terminated
SArray* tableId;
int32_t offset;
} TFileValue;
static int tfileStrCompare(const void* a, const void* b);
static int tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
......@@ -49,6 +44,7 @@ static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
TFileCache* tfileCacheCreate(const char* path) {
......@@ -209,16 +205,28 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
tw->ctx = ctx;
tw->header = *header;
tfileWriteHeader(tw);
tw->fb = fstBuilderCreate(ctx, 0);
if (tw->fb == NULL) {
tfileWriterDestroy(tw);
return NULL;
}
return tw;
}
int tfileWriterPut(TFileWriter* tw, void* data) {
// sort by coltype and write to tindex
__compar_fn_t fn = getComparFunc(tw->header.colType, 0);
__compar_fn_t fn;
int8_t colType = tw->header.colType;
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
fn = tfileStrCompare;
} else {
fn = getComparFunc(colType, 0);
}
taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
int32_t bufLimit = 4096, offset = 0;
char* buf = calloc(1, sizeof(bufLimit));
char* buf = calloc(1, sizeof(char) * bufLimit);
char* p = buf;
int32_t sz = taosArrayGetSize((SArray*)data);
int32_t fstOffset = tw->offset;
......@@ -267,6 +275,9 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
//
}
}
fstBuilderFinish(tw->fb);
fstBuilderDestroy(tw->fb);
tw->fb = NULL;
return 0;
}
void tfileWriterDestroy(TFileWriter* tw) {
......@@ -305,6 +316,12 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
return 0;
}
static int tfileStrCompare(const void* a, const void* b) {
int ret = strcmp((char*)a, (char*)b);
if (ret == 0) { return ret; }
return ret < 0 ? -1 : 1;
}
static int tfileValueCompare(const void* a, const void* b, const void* param) {
__compar_fn_t fn = *(__compar_fn_t*)param;
......@@ -445,6 +462,10 @@ static int tfileCompare(const void* a, const void* b) {
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
}
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) {
sprintf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version);
return;
}
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
// read suid & colid & version success
......
......@@ -2,8 +2,7 @@
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
......@@ -13,141 +12,141 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <string>
#include <iostream>
#include <string>
#include "index.h"
#include "tutil.h"
#include "indexInt.h"
#include "index_fst.h"
#include "index_fst_util.h"
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "tutil.h"
class FstWriter {
public:
FstWriter() {
_wc = writerCtxCreate(TFile, "/tmp/tindex", false, 0);
_b = fstBuilderCreate(NULL, 0);
}
bool Put(const std::string &key, uint64_t val) {
FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size());
bool ok = fstBuilderInsert(_b, skey, val);
fstSliceDestroy(&skey);
return ok;
}
~FstWriter() {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc);
}
private:
FstBuilder *_b;
WriterCtx *_wc;
public:
FstWriter() {
_wc = writerCtxCreate(TFile, "/tmp/tindex", false, 64 * 1024 * 1024);
_b = fstBuilderCreate(NULL, 0);
}
bool Put(const std::string& key, uint64_t val) {
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
bool ok = fstBuilderInsert(_b, skey, val);
fstSliceDestroy(&skey);
return ok;
}
~FstWriter() {
fstBuilderFinish(_b);
fstBuilderDestroy(_b);
writerCtxDestroy(_wc);
}
private:
FstBuilder* _b;
WriterCtx* _wc;
};
class FstReadMemory {
public:
FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, "/tmp/tindex", true, 0);
_w = fstCountingWriterCreate(_wc);
_size = size;
memset((void *)&_s, 0, sizeof(_s));
}
bool init() {
char *buf = (char *)calloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t *)buf, _size);
if (nRead <= 0) { return false; }
_size = nRead;
_s = fstSliceCreate((uint8_t *)buf, _size);
_fst = fstCreate(&_s);
free(buf);
return _fst != NULL;
}
bool Get(const std::string &key, uint64_t *val) {
FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size());
bool ok = fstGet(_fst, &skey, val);
fstSliceDestroy(&skey);
return ok;
}
bool GetWithTimeCostUs(const std::string &key, uint64_t *val, uint64_t *elapse) {
int64_t s = taosGetTimestampUs();
bool ok = this->Get(key, val);
int64_t e = taosGetTimestampUs();
*elapse = e - s;
return ok;
}
// add later
bool Search(AutomationCtx *ctx, std::vector<uint64_t> &result) {
FstStreamBuilder *sb = fstSearch(_fst, ctx);
StreamWithState *st = streamBuilderIntoStream(sb);
StreamWithStateResult *rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
result.push_back((uint64_t)(rt->out.out));
}
return true;
}
bool SearchWithTimeCostUs(AutomationCtx *ctx, std::vector<uint64_t> &result) {
int64_t s = taosGetTimestampUs();
bool ok = this->Search(ctx, result);
int64_t e = taosGetTimestampUs();
return ok;
}
~FstReadMemory() {
public:
FstReadMemory(size_t size) {
_wc = writerCtxCreate(TFile, "/tmp/tindex", true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_size = size;
memset((void*)&_s, 0, sizeof(_s));
}
bool init() {
char* buf = (char*)calloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) { return false; }
_size = nRead;
_s = fstSliceCreate((uint8_t*)buf, _size);
_fst = fstCreate(&_s);
free(buf);
return _fst != NULL;
}
bool Get(const std::string& key, uint64_t* val) {
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
bool ok = fstGet(_fst, &skey, val);
fstSliceDestroy(&skey);
return ok;
}
bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) {
int64_t s = taosGetTimestampUs();
bool ok = this->Get(key, val);
int64_t e = taosGetTimestampUs();
*elapse = e - s;
return ok;
}
// add later
bool Search(AutomationCtx* ctx, std::vector<uint64_t>& result) {
FstStreamBuilder* sb = fstSearch(_fst, ctx);
StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
result.push_back((uint64_t)(rt->out.out));
}
return true;
}
bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) {
int64_t s = taosGetTimestampUs();
bool ok = this->Search(ctx, result);
int64_t e = taosGetTimestampUs();
return ok;
}
~FstReadMemory() {
fstCountingWriterDestroy(_w);
fstDestroy(_fst);
fstSliceDestroy(&_s);
writerCtxDestroy(_wc);
}
private:
FstCountingWriter *_w;
Fst *_fst;
FstSlice _s;
WriterCtx *_wc;
size_t _size;
};
//TEST(IndexTest, index_create_test) {
}
private:
FstCountingWriter* _w;
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
size_t _size;
};
// TEST(IndexTest, index_create_test) {
// SIndexOpts *opts = indexOptsCreate();
// SIndex *index = indexOpen(opts, "./test");
// if (index == NULL) {
// std::cout << "index open failed" << std::endl;
// std::cout << "index open failed" << std::endl;
// }
//
//
// // write
//
// // write
// for (int i = 0; i < 100000; i++) {
// SIndexMultiTerm* terms = indexMultiTermCreate();
// std::string val = "field";
// std::string val = "field";
//
// indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size());
//
// val.append(std::to_string(i));
// val.append(std::to_string(i));
// indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size());
//
// val.insert(0, std::to_string(i));
// indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size());
//
// val.append("const");
// val.append("const");
// indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size());
//
//
//
// indexPut(index, terms, i);
// indexMultiTermDestroy(terms);
// }
//
// }
//
//
// // query
// SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST);
//
// SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST);
//
// indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX);
// indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM);
//
// SArray *result = (SArray *)taosArrayInit(10, sizeof(int));
// SArray *result = (SArray *)taosArrayInit(10, sizeof(int));
// indexSearch(index, multiQuery, result);
//
// std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl;
......@@ -155,25 +154,24 @@ class FstReadMemory {
// int *v = (int *)taosArrayGet(result, i);
// std::cout << "value --->" << *v << std::endl;
// }
// // add more test case
// // add more test case
// indexMultiTermQueryDestroy(multiQuery);
//
// indexOptsDestroy(opts);
// indexClose(index);
// indexOptsDestroy(opts);
// indexClose(index);
// //
//}
#define L 100
#define M 100
#define N 100
int Performance_fstWriteRecords(FstWriter *b) {
std::string str("aa");
int Performance_fstWriteRecords(FstWriter* b) {
std::string str("aa");
for (int i = 0; i < L; i++) {
str[0] = 'a' + i;
str.resize(2);
for(int j = 0; j < M; j++) {
str.resize(2);
for (int j = 0; j < M; j++) {
str[1] = 'a' + j;
str.resize(2);
for (int k = 0; k < N; k++) {
......@@ -181,87 +179,86 @@ int Performance_fstWriteRecords(FstWriter *b) {
b->Put(str, k);
printf("(%d, %d, %d, %s)\n", i, j, k, str.c_str());
}
}
}
}
return L * M * N;
}
void Performance_fstReadRecords(FstReadMemory *m) {
void Performance_fstReadRecords(FstReadMemory* m) {
std::string str("aa");
for (int i = 0; i < M; i++) {
str[0] = 'a' + i;
str.resize(2);
for(int j = 0; j < N; j++) {
str.resize(2);
for (int j = 0; j < N; j++) {
str[1] = 'a' + j;
str.resize(2);
for (int k = 0; k < L; k++) {
str.push_back('a');
uint64_t val, cost;
uint64_t val, cost;
if (m->GetWithTimeCostUs(str, &val, &cost)) {
printf("succes to get kv(%s, %" PRId64"), cost: %" PRId64"\n", str.c_str(), val, cost);
printf("succes to get kv(%s, %" PRId64 "), cost: %" PRId64 "\n", str.c_str(), val, cost);
} else {
printf("failed to get key: %s\n", str.c_str());
}
}
}
}
}
}
void checkFstPerf() {
FstWriter *fw = new FstWriter;
int64_t s = taosGetTimestampUs();
FstWriter* fw = new FstWriter;
int64_t s = taosGetTimestampUs();
int num = Performance_fstWriteRecords(fw);
int num = Performance_fstWriteRecords(fw);
int64_t e = taosGetTimestampUs();
printf("write %d record cost %" PRId64"us\n", num, e - s);
printf("write %d record cost %" PRId64 "us\n", num, e - s);
delete fw;
FstReadMemory *m = new FstReadMemory(1024 * 64);
if (m->init()) {
printf("success to init fst read");
}
Performance_fstReadRecords(m);
FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init()) { printf("success to init fst read"); }
Performance_fstReadRecords(m);
delete m;
}
}
void checkFstPrefixSearch() {
FstWriter *fw = new FstWriter;
int64_t s = taosGetTimestampUs();
int count = 2;
FstWriter* fw = new FstWriter;
int64_t s = taosGetTimestampUs();
int count = 2;
std::string key("ab");
for (int i = 0; i < count; i++) {
key[1] = key[1] + i;
fw->Put(key, i);
key[1] = key[1] + i;
fw->Put(key, i);
}
int64_t e = taosGetTimestampUs();
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
delete fw;
FstReadMemory *m = new FstReadMemory(1024 * 64);
FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl;
std::cout << "init readMemory failed" << std::endl;
delete m;
return;
}
// prefix search
// prefix search
std::vector<uint64_t> result;
AutomationCtx *ctx = automCtxCreate((void *)"ab", AUTOMATION_PREFIX);
m->Search(ctx, result);
assert(result.size() == count);
AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_PREFIX);
m->Search(ctx, result);
assert(result.size() == count);
for (int i = 0; i < result.size(); i++) {
assert(result[i] == i); // check result
assert(result[i] == i); // check result
}
free(ctx);
delete m;
}
}
void validateFst() {
int val = 100;
int count = 100;
FstWriter *fw = new FstWriter;
// write
int val = 100;
int count = 100;
FstWriter* fw = new FstWriter;
// write
{
std::string key("ab");
for (int i = 0; i < count; i++) {
......@@ -272,99 +269,161 @@ void validateFst() {
delete fw;
// read
FstReadMemory *m = new FstReadMemory(1024 * 64);
if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl;
FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init() == false) {
std::cout << "init readMemory failed" << std::endl;
delete m;
return;
}
{
std::string key("ab");
uint64_t out;
if (m->Get(key, &out)) {
printf("success to get (%s, %" PRId64")\n", key.c_str(), out);
} else {
printf("failed to get(%s)\n", key.c_str());
}
for (int i = 0; i < count; i++) {
key.push_back('a' + i);
if (m->Get(key, &out) ) {
assert(val - i == out);
printf("success to get (%s, %" PRId64")\n", key.c_str(), out);
} else {
printf("failed to get(%s)\n", key.c_str());
std::string key("ab");
uint64_t out;
if (m->Get(key, &out)) {
printf("success to get (%s, %" PRId64 ")\n", key.c_str(), out);
} else {
printf("failed to get(%s)\n", key.c_str());
}
for (int i = 0; i < count; i++) {
key.push_back('a' + i);
if (m->Get(key, &out)) {
assert(val - i == out);
printf("success to get (%s, %" PRId64 ")\n", key.c_str(), out);
} else {
printf("failed to get(%s)\n", key.c_str());
}
}
}
}
}
delete m;
}
}
class IndexEnv : public ::testing::Test {
protected:
virtual void SetUp() {
taosRemoveDir(path);
opts = indexOptsCreate();
int ret = indexOpen(opts, path, &index);
assert(ret == 0);
}
virtual void TearDown() {
indexClose(index);
indexOptsDestroy(opts);
}
const char *path = "/tmp/tindex";
SIndexOpts *opts;
SIndex *index;
protected:
virtual void SetUp() {
taosRemoveDir(path);
opts = indexOptsCreate();
int ret = indexOpen(opts, path, &index);
assert(ret == 0);
}
virtual void TearDown() {
indexClose(index);
indexOptsDestroy(opts);
}
const char* path = "/tmp/tindex";
SIndexOpts* opts;
SIndex* index;
};
TEST_F(IndexEnv, testPut) {
// single index column
{
std::string colName("tag1"), colVal("Hello world");
SIndexTerm *term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexMultiTerm *terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
int tableId = i;
int ret = indexPut(index, terms, tableId);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
// multi index column
{
SIndexMultiTerm *terms = indexMultiTermCreate();
{
std::string colName("tag1"), colVal("Hello world");
SIndexTerm *term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
indexMultiTermAdd(terms, term);
}
{
std::string colName("tag2"), colVal("Hello world");
SIndexTerm *term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
indexMultiTermAdd(terms, term);
}
for (int i = 0; i < 100; i++) {
int tableId = i;
int ret = indexPut(index, terms, tableId);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
//
}
TEST_F(IndexEnv, testDel) {
// TEST_F(IndexEnv, testPut) {
// // single index column
// {
// std::string colName("tag1"), colVal("Hello world");
// SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(),
// colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term);
//
// for (size_t i = 0; i < 100; i++) {
// int tableId = i;
// int ret = indexPut(index, terms, tableId);
// assert(ret == 0);
// }
// indexMultiTermDestroy(terms);
// }
// // multi index column
// {
// SIndexMultiTerm* terms = indexMultiTermCreate();
// {
// std::string colName("tag1"), colVal("Hello world");
// SIndexTerm* term =
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
// indexMultiTermAdd(terms, term);
// }
// {
// std::string colName("tag2"), colVal("Hello world");
// SIndexTerm* term =
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
// indexMultiTermAdd(terms, term);
// }
//
// for (int i = 0; i < 100; i++) {
// int tableId = i;
// int ret = indexPut(index, terms, tableId);
// assert(ret == 0);
// }
// indexMultiTermDestroy(terms);
// }
// //
//}
class IndexTFileEnv : public ::testing::Test {
protected:
virtual void SetUp() {
taosRemoveDir(dir);
taosMkDir(dir);
tfInit();
std::string colName("voltage");
header.suid = 1;
header.version = 1;
memcpy(header.colName, colName.c_str(), colName.size());
header.colType = TSDB_DATA_TYPE_BINARY;
std::string path(dir);
int colId = 2;
char buf[64] = {0};
sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId, header.version);
path.append("/").append(buf);
ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
twrite = tfileWriterCreate(ctx, &header);
}
virtual void TearDown() {
// indexClose(index);
// indexeptsDestroy(opts);
tfCleanup();
tfileWriterDestroy(twrite);
}
const char* dir = "/tmp/tindex";
WriterCtx* ctx = NULL;
TFileHeader header;
TFileWriter* twrite = NULL;
};
static TFileValue* genTFileValue(const char* val) {
TFileValue* tv = (TFileValue*)calloc(1, sizeof(TFileValue));
int32_t vlen = strlen(val) + 1;
tv->colVal = (char*)calloc(1, vlen);
memcpy(tv->colVal, val, vlen);
tv->tableId = (SArray*)taosArrayInit(1, sizeof(uint64_t));
for (size_t i = 0; i < 10; i++) {
uint64_t v = i;
taosArrayPush(tv->tableId, &v);
}
return tv;
}
static void destroyTFileValue(void* val) {
TFileValue* tv = (TFileValue*)val;
free(tv->colVal);
taosArrayDestroy(tv->tableId);
free(tv);
}
TEST_F(IndexTFileEnv, test_tfile_write) {
TFileValue* v1 = genTFileValue("c");
TFileValue* v2 = genTFileValue("a");
SArray* data = (SArray*)taosArrayInit(4, sizeof(void*));
taosArrayPush(data, &v1);
taosArrayPush(data, &v2);
tfileWriterPut(twrite, data);
// tfileWriterDestroy(twrite);
for (size_t i = 0; i < taosArrayGetSize(data); i++) {
destroyTFileValue(taosArrayGetP(data, i));
}
taosArrayDestroy(data);
}
......@@ -8,7 +8,8 @@ SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
#endif // TDENGINE_ASTTOMSG_H
......@@ -68,7 +68,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
* @param type
* @return
*/
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen, int32_t* type, char* msgBuf, int32_t msgBufLen);
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen);
/**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
......
......@@ -31,6 +31,7 @@
#include "ttoken.h"
#include "ttokendef.h"
#include "tvariant.h"
#include "parserInt.h"
}
%syntax_error {
......@@ -173,7 +174,7 @@ cmd ::= ALTER DNODE ids(X) ids(Y) ids(Z). { setDCLSqlElems(pInfo, TSDB_SQL
cmd ::= ALTER LOCAL ids(X). { setDCLSqlElems(pInfo, TSDB_SQL_CFG_LOCAL, 1, &X); }
cmd ::= ALTER LOCAL ids(X) ids(Y). { setDCLSqlElems(pInfo, TSDB_SQL_CFG_LOCAL, 2, &X, &Y); }
cmd ::= ALTER DATABASE ids(X) alter_db_optr(Y). { SToken t = {0}; setCreateDbInfo(pInfo, TSDB_SQL_ALTER_DB, &X, &Y, &t);}
cmd ::= ALTER TOPIC ids(X) alter_topic_optr(Y). { SToken t = {0}; setCreateDbInfo(pInfo, TSDB_SQL_ALTER_DB, &X, &Y, &t);}
//cmd ::= ALTER TOPIC ids(X) alter_topic_optr(Y). { SToken t = {0}; setCreateDbInfo(pInfo, TSDB_SQL_ALTER_DB, &X, &Y, &t);}
cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);}
cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);}
......@@ -203,7 +204,7 @@ cmd ::= CREATE DNODE ids(X). { setDCLSqlElems(pInfo, TSDB_SQL_CREATE_DNODE
cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z).
{ setCreateAcctSql(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);}
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
cmd ::= CREATE TOPIC ifnotexists(Z) ids(X) topic_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
//cmd ::= CREATE TOPIC ifnotexists(Z) ids(X) topic_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
cmd ::= CREATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) bufsize(B). { setCreateFuncInfo(pInfo, TSDB_SQL_CREATE_FUNCTION, &X, &Y, &Z, &B, 1);}
cmd ::= CREATE AGGREGATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) bufsize(B). { setCreateFuncInfo(pInfo, TSDB_SQL_CREATE_FUNCTION, &X, &Y, &Z, &B, 2);}
cmd ::= CREATE USER ids(X) PASS ids(Y). { setCreateUserSql(pInfo, &X, &Y);}
......@@ -278,10 +279,10 @@ comp(Y) ::= COMP INTEGER(X). { Y = X; }
prec(Y) ::= PRECISION STRING(X). { Y = X; }
update(Y) ::= UPDATE INTEGER(X). { Y = X; }
cachelast(Y) ::= CACHELAST INTEGER(X). { Y = X; }
partitions(Y) ::= PARTITIONS INTEGER(X). { Y = X; }
//partitions(Y) ::= PARTITIONS INTEGER(X). { Y = X; }
%type db_optr {SCreateDbInfo}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y); Y.dbType = TSDB_DB_TYPE_DEFAULT;}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);}
db_optr(Y) ::= db_optr(Z) cache(X). { Y = Z; Y.cacheBlockSize = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) replica(X). { Y = Z; Y.replica = strtol(X.z, NULL, 10); }
......@@ -299,13 +300,13 @@ db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); }
%type topic_optr {SCreateDbInfo}
topic_optr(Y) ::= db_optr(Z). { Y = Z; Y.dbType = TSDB_DB_TYPE_TOPIC; }
topic_optr(Y) ::= topic_optr(Z) partitions(X). { Y = Z; Y.partitions = strtol(X.z, NULL, 10); }
//%type topic_optr {SCreateDbInfo}
//
//topic_optr(Y) ::= db_optr(Z). { Y = Z; Y.dbType = TSDB_DB_TYPE_TOPIC; }
//topic_optr(Y) ::= topic_optr(Z) partitions(X). { Y = Z; Y.partitions = strtol(X.z, NULL, 10); }
%type alter_db_optr {SCreateDbInfo}
alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y); Y.dbType = TSDB_DB_TYPE_DEFAULT;}
alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);}
alter_db_optr(Y) ::= alter_db_optr(Z) replica(X). { Y = Z; Y.replica = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) quorum(X). { Y = Z; Y.quorum = strtol(X.z, NULL, 10); }
......@@ -319,10 +320,10 @@ alter_db_optr(Y) ::= alter_db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = str
//alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
//alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } not support yet
%type alter_topic_optr {SCreateDbInfo}
//%type alter_topic_optr {SCreateDbInfo}
alter_topic_optr(Y) ::= alter_db_optr(Z). { Y = Z; Y.dbType = TSDB_DB_TYPE_TOPIC; }
alter_topic_optr(Y) ::= alter_topic_optr(Z) partitions(X). { Y = Z; Y.partitions = strtol(X.z, NULL, 10); }
//alter_topic_optr(Y) ::= alter_db_optr(Z). { Y = Z; Y.dbType = TSDB_DB_TYPE_TOPIC; }
//alter_topic_optr(Y) ::= alter_topic_optr(Z) partitions(X). { Y = Z; Y.partitions = strtol(X.z, NULL, 10); }
%type typename {SField}
typename(A) ::= ids(X). {
......
......@@ -130,88 +130,88 @@
#define TK_PRECISION 112
#define TK_UPDATE 113
#define TK_CACHELAST 114
#define TK_PARTITIONS 115
#define TK_UNSIGNED 116
#define TK_TAGS 117
#define TK_USING 118
#define TK_NULL 119
#define TK_NOW 120
#define TK_SELECT 121
#define TK_UNION 122
#define TK_ALL 123
#define TK_DISTINCT 124
#define TK_FROM 125
#define TK_VARIABLE 126
#define TK_INTERVAL 127
#define TK_EVERY 128
#define TK_SESSION 129
#define TK_STATE_WINDOW 130
#define TK_FILL 131
#define TK_SLIDING 132
#define TK_ORDER 133
#define TK_BY 134
#define TK_ASC 135
#define TK_GROUP 136
#define TK_HAVING 137
#define TK_LIMIT 138
#define TK_OFFSET 139
#define TK_SLIMIT 140
#define TK_SOFFSET 141
#define TK_WHERE 142
#define TK_RESET 143
#define TK_QUERY 144
#define TK_SYNCDB 145
#define TK_ADD 146
#define TK_COLUMN 147
#define TK_MODIFY 148
#define TK_TAG 149
#define TK_CHANGE 150
#define TK_SET 151
#define TK_KILL 152
#define TK_CONNECTION 153
#define TK_STREAM 154
#define TK_COLON 155
#define TK_ABORT 156
#define TK_AFTER 157
#define TK_ATTACH 158
#define TK_BEFORE 159
#define TK_BEGIN 160
#define TK_CASCADE 161
#define TK_CLUSTER 162
#define TK_CONFLICT 163
#define TK_COPY 164
#define TK_DEFERRED 165
#define TK_DELIMITERS 166
#define TK_DETACH 167
#define TK_EACH 168
#define TK_END 169
#define TK_EXPLAIN 170
#define TK_FAIL 171
#define TK_FOR 172
#define TK_IGNORE 173
#define TK_IMMEDIATE 174
#define TK_INITIALLY 175
#define TK_INSTEAD 176
#define TK_KEY 177
#define TK_OF 178
#define TK_RAISE 179
#define TK_REPLACE 180
#define TK_RESTRICT 181
#define TK_ROW 182
#define TK_STATEMENT 183
#define TK_TRIGGER 184
#define TK_VIEW 185
#define TK_IPTOKEN 186
#define TK_SEMI 187
#define TK_NONE 188
#define TK_PREV 189
#define TK_LINEAR 190
#define TK_IMPORT 191
#define TK_TBNAME 192
#define TK_JOIN 193
#define TK_INSERT 194
#define TK_INTO 195
#define TK_VALUES 196
#define TK_UNSIGNED 115
#define TK_TAGS 116
#define TK_USING 117
#define TK_NULL 118
#define TK_NOW 119
#define TK_SELECT 120
#define TK_UNION 121
#define TK_ALL 122
#define TK_DISTINCT 123
#define TK_FROM 124
#define TK_VARIABLE 125
#define TK_INTERVAL 126
#define TK_EVERY 127
#define TK_SESSION 128
#define TK_STATE_WINDOW 129
#define TK_FILL 130
#define TK_SLIDING 131
#define TK_ORDER 132
#define TK_BY 133
#define TK_ASC 134
#define TK_GROUP 135
#define TK_HAVING 136
#define TK_LIMIT 137
#define TK_OFFSET 138
#define TK_SLIMIT 139
#define TK_SOFFSET 140
#define TK_WHERE 141
#define TK_RESET 142
#define TK_QUERY 143
#define TK_SYNCDB 144
#define TK_ADD 145
#define TK_COLUMN 146
#define TK_MODIFY 147
#define TK_TAG 148
#define TK_CHANGE 149
#define TK_SET 150
#define TK_KILL 151
#define TK_CONNECTION 152
#define TK_STREAM 153
#define TK_COLON 154
#define TK_ABORT 155
#define TK_AFTER 156
#define TK_ATTACH 157
#define TK_BEFORE 158
#define TK_BEGIN 159
#define TK_CASCADE 160
#define TK_CLUSTER 161
#define TK_CONFLICT 162
#define TK_COPY 163
#define TK_DEFERRED 164
#define TK_DELIMITERS 165
#define TK_DETACH 166
#define TK_EACH 167
#define TK_END 168
#define TK_EXPLAIN 169
#define TK_FAIL 170
#define TK_FOR 171
#define TK_IGNORE 172
#define TK_IMMEDIATE 173
#define TK_INITIALLY 174
#define TK_INSTEAD 175
#define TK_KEY 176
#define TK_OF 177
#define TK_RAISE 178
#define TK_REPLACE 179
#define TK_RESTRICT 180
#define TK_ROW 181
#define TK_STATEMENT 182
#define TK_TRIGGER 183
#define TK_VIEW 184
#define TK_IPTOKEN 185
#define TK_SEMI 186
#define TK_NONE 187
#define TK_PREV 188
#define TK_LINEAR 189
#define TK_IMPORT 190
#define TK_TBNAME 191
#define TK_JOIN 192
#define TK_INSERT 193
#define TK_INTO 194
#define TK_VALUES 195
#define TK_SPACE 300
......
......@@ -207,17 +207,23 @@ int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbS
return TSDB_CODE_SUCCESS;
}
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, char* msgBuf, int32_t msgLen) {
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
SCreateDbMsg* pCreateMsg = calloc(1, sizeof(SCreateDbMsg));
SMsgBuf msg = {.buf = msgBuf, .len = msgLen};
if (setDbOptions(pCreateMsg, pCreateDbInfo, &msg) != TSDB_CODE_SUCCESS) {
if (setDbOptions(pCreateMsg, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) {
tfree(pCreateMsg);
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return NULL;
}
SName name = {0};
int32_t ret = tNameSetDbName(&name, pCtx->acctId, pCreateDbInfo->dbname.z, pCreateDbInfo->dbname.n);
if (ret != TSDB_CODE_SUCCESS) {
terrno = ret;
return NULL;
}
tNameGetFullDbName(&name, pCreateMsg->db);
return pCreateMsg;
}
......@@ -263,14 +269,17 @@ int32_t createSName(SName* pName, SToken* pTableName, SParseBasicCtx* pParseCtx,
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SSchema* pSchema;
int32_t numOfTags = 0;
int32_t numOfCols = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pColumns);
int32_t numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
if (pCreateTableSql->colInfo.pTagColumns != NULL) {
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
}
SCreateStbMsg* pCreateTableMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema));
char* pMsg = NULL;
int8_t type = pCreateTableSql->type;
if (type == TSQL_CREATE_TABLE) { // create by using super table, tags value
int32_t tableType = pCreateTableSql->type;
if (tableType != TSQL_CREATE_TABLE && tableType != TSQL_CREATE_STABLE) { // create by using super table, tags value
#if 0
SArray* list = pInfo->pCreateTableInfo->childTableInfo;
......@@ -309,15 +318,13 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
return NULL;
}
pCreateTableMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateTableMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateTableMsg->numOfColumns = htonl(numOfCols);
pCreateTableMsg->numOfTags = htonl(numOfTags);
pCreateTableMsg->numOfTags = htonl(numOfTags);
pSchema = (SSchema*) pCreateTableMsg->pSchema;
for (int i = 0; i < numOfCols; ++i) {
TAOS_FIELD* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
pSchema->type = pField->type;
pSchema->bytes = htonl(pField->bytes);
strcpy(pSchema->name, pField->name);
......@@ -326,8 +333,7 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
}
for(int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* pField = taosArrayGet(pCreateTableSql->colInfo.pTagColumns, i);
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pTagColumns, i);
pSchema->type = pField->type;
pSchema->bytes = htonl(pField->bytes);
strcpy(pSchema->name, pField->name);
......@@ -343,3 +349,24 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
return pCreateTableMsg;
}
SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0);
SName name = {0};
int32_t code = createSName(&name, tableName, pParseCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
terrno = buildInvalidOperationMsg(pMsgBuf, "invalid table name");
return NULL;
}
SDropTableMsg *pDropTableMsg = (SDropTableMsg*) calloc(1, sizeof(SDropTableMsg));
code = tNameExtractFullName(&name, pDropTableMsg->name);
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
pDropTableMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
*len = sizeof(SDropTableMsg);
return pDropTableMsg;
}
......@@ -4028,7 +4028,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
}
// todo remove it
static int32_t setShowInfo(struct SSqlInfo* pInfo, void** output, int32_t* msgLen, SMsgBuf* pMsgBuf) {
static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** output, int32_t* outputLen, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid name";
const char* msg2 = "wildcard string should be less than %d characters";
const char* msg3 = "database name too long";
......@@ -4040,9 +4040,8 @@ static int32_t setShowInfo(struct SSqlInfo* pInfo, void** output, int32_t* msgLe
* database prefix in pInfo->pMiscInfo->a[0]
* wildcard in like clause in pInfo->pMiscInfo->a[1]
*/
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
int16_t showType = pShowInfo->showType;
if (showType == TSDB_MGMT_TABLE_TABLE || showType == TSDB_MGMT_TABLE_VGROUP) {
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
SToken* pDbPrefixToken = &pShowInfo->prefix;
if (pDbPrefixToken->type != 0) {
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
......@@ -4091,8 +4090,8 @@ static int32_t setShowInfo(struct SSqlInfo* pInfo, void** output, int32_t* msgLe
}
}
*output = buildShowMsg(pShowInfo, 0, pMsgBuf->buf, pMsgBuf->len);
*msgLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/;
*output = buildShowMsg(pShowInfo, pCtx->requestId, pMsgBuf->buf, pMsgBuf->len);
*outputLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/;
return TSDB_CODE_SUCCESS;
}
......@@ -4246,7 +4245,7 @@ static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) {
const char* msg8 = "illegal number of columns";
// first column must be timestamp
TAOS_FIELD* pField = taosArrayGet(pFieldList, 0);
SField* pField = taosArrayGet(pFieldList, 0);
if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
......@@ -4274,7 +4273,7 @@ static int32_t validateTagParams(SArray* pTagsList, SArray* pFieldList, SMsgBuf*
// field name must be unique
for (int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* p = taosArrayGet(pTagsList, i);
SField* p = taosArrayGet(pTagsList, i);
if (has(pFieldList, 0, p->name) == true) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
......@@ -4295,7 +4294,6 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) {
// if sql specifies db, use it, otherwise use default db
SToken* pzTableName = &(pCreateTable->name);
bool dbIncluded = false;
if (parserValidateNameToken(pzTableName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
......@@ -4308,14 +4306,12 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) {
return TSDB_CODE_SUCCESS;
}
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen, int32_t* type, char* msgBuf, int32_t msgBufLen) {
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen) {
int32_t code = 0;
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf *pMsgBuf = &m;
*type = pInfo->type;
switch (pInfo->type) {
case TSDB_SQL_CREATE_USER:
case TSDB_SQL_ALTER_USER: {
......@@ -4361,7 +4357,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void**
}
}
*output = buildUserManipulationMsg(pInfo, outputLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->pMsg = (char*)buildUserManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_USER)? TSDB_MSG_TYPE_CREATE_USER:TSDB_MSG_TYPE_ALTER_USER;
break;
}
......@@ -4397,18 +4394,21 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void**
}
}
*output = buildAcctManipulationMsg(pInfo, outputLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->pMsg = (char*)buildAcctManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_ACCT)? TSDB_MSG_TYPE_CREATE_ACCT:TSDB_MSG_TYPE_ALTER_ACCT;
break;
}
case TSDB_SQL_DROP_ACCT:
case TSDB_SQL_DROP_USER: {
*output = buildDropUserMsg(pInfo, outputLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->pMsg = (char*)buildDropUserMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
pDcl->msgType = (pInfo->type == TSDB_SQL_DROP_ACCT)? TSDB_MSG_TYPE_DROP_ACCT:TSDB_MSG_TYPE_DROP_USER;
break;
}
case TSDB_SQL_SHOW: {
code = setShowInfo(pInfo, output, outputLen, pMsgBuf);
code = setShowInfo(&pInfo->pMiscInfo->showOpt, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, pMsgBuf);
pDcl->msgType = TSDB_MSG_TYPE_SHOW;
break;
}
......@@ -4429,8 +4429,9 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void**
SUseDbMsg *pUseDbMsg = (SUseDbMsg *) calloc(1, sizeof(SUseDbMsg));
tNameExtractFullName(&n, pUseDbMsg->db);
*output = pUseDbMsg;
*outputLen = sizeof(SUseDbMsg);
pDcl->pMsg = (char*)pUseDbMsg;
pDcl->msgLen = sizeof(SUseDbMsg);
pDcl->msgType = TSDB_MSG_TYPE_USE_DB;
break;
}
......@@ -4451,18 +4452,43 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void**
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pMsgBuf->buf, pMsgBuf->len);
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
strncpy(pCreateMsg->db, token.z, token.n);
*output = pCreateMsg;
*outputLen = sizeof(SCreateDbMsg);
pDcl->pMsg = (char*)pCreateMsg;
pDcl->msgLen = sizeof(SCreateDbMsg);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB)? TSDB_MSG_TYPE_CREATE_DB:TSDB_MSG_TYPE_ALTER_DB;
break;
}
case TSDB_SQL_DROP_DB: {
const char* msg1 = "invalid database name";
assert(taosArrayGetSize(pInfo->pMiscInfo->a) == 1);
SToken* dbName = taosArrayGet(pInfo->pMiscInfo->a, 0);
SName name = {0};
code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n);
if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
SDropDbMsg *pDropDbMsg = (SDropDbMsg*) calloc(1, sizeof(SDropDbMsg));
code = tNameExtractFullName(&name, pDropDbMsg->db);
pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_DB_NAME_T);
pDcl->msgType = TSDB_MSG_TYPE_DROP_DB;
pDcl->msgLen = sizeof(SDropDbMsg);
pDcl->pMsg = (char*)pDropDbMsg;
return TSDB_CODE_SUCCESS;
}
case TSDB_SQL_CREATE_TABLE: {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
......@@ -4470,7 +4496,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void**
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code;
}
*output = buildCreateTableMsg(pCreateTable, outputLen, pCtx, pMsgBuf);
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE)? TSDB_MSG_TYPE_CREATE_TABLE:TSDB_MSG_TYPE_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
// if ((code = doCheckForCreateFromStable(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code;
......@@ -4483,6 +4510,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, void**
break;
}
case TSDB_SQL_DROP_TABLE: {
pDcl->pMsg = (char*)buildDropTableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
if (pDcl->pMsg == NULL) {
return terrno;
}
pDcl->msgType = TSDB_MSG_TYPE_DROP_STB;
return TSDB_CODE_SUCCESS;
break;
}
default:
break;
}
......
......@@ -909,6 +909,7 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
}
*pInfo = context.pOutput;
context.pOutput->nodeType = TSDB_SQL_INSERT;
context.pOutput->schemaAttache = pContext->schemaAttached;
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
......
......@@ -20,7 +20,7 @@
#include "function.h"
#include "insertParser.h"
bool qIsInsertSql(const char* pStr, size_t length) {
bool isInsertSql(const char* pStr, size_t length) {
int32_t index = 0;
do {
......@@ -31,18 +31,28 @@ bool qIsInsertSql(const char* pStr, size_t length) {
} while (1);
}
int32_t qParseQuerySql(const char* pStr, size_t length, SParseBasicCtx* pParseCtx, int32_t *type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen) {
SSqlInfo info = doGenerateAST(pStr);
bool qIsDclQuery(const SQueryNode* pQuery) {
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type;
}
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
SSqlInfo info = doGenerateAST(pCxt->pSql);
if (!info.valid) {
strncpy(msg, info.msg, msgLen);
strncpy(pCxt->pMsg, info.msg, pCxt->msgLen);
terrno = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return terrno;
}
if (!isDqlSqlStatement(&info)) {
int32_t code = qParserValidateDclSqlNode(&info, pParseCtx, pOutput, outputLen, type, msg, msgLen);
SDclStmtInfo* pDcl = calloc(1, sizeof(SQueryStmtInfo));
if (NULL == pDcl) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code.
return terrno;
}
pDcl->nodeType = info.type;
int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) {
// do nothing
*pQuery = (SQueryNode*)pDcl;
}
} else {
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
......@@ -53,9 +63,9 @@ int32_t qParseQuerySql(const char* pStr, size_t length, SParseBasicCtx* pParseCt
struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(NULL, &pCatalog);
code = qParserValidateSqlNode(pCatalog, &info, pQueryInfo, pParseCtx->requestId, msg, msgLen);
code = qParserValidateSqlNode(pCatalog, &info, pQueryInfo, pCxt->ctx.requestId, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) {
*pOutput = pQueryInfo;
*pQuery = (SQueryNode*)pQueryInfo;
}
}
......@@ -63,8 +73,12 @@ int32_t qParseQuerySql(const char* pStr, size_t length, SParseBasicCtx* pParseCt
return TSDB_CODE_SUCCESS;
}
int32_t qParseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
return parseInsertSql(pContext, pInfo);
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
return parseInsertSql(pCxt, (SInsertStmtInfo**)pQuery);
} else {
return parseQuerySql(pCxt, pQuery);
}
}
int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
......
此差异已折叠。
......@@ -217,7 +217,7 @@ static SKeyword keywordTable[] = {
{"UNION", TK_UNION},
{"CACHELAST", TK_CACHELAST},
{"DISTINCT", TK_DISTINCT},
{"PARTITIONS", TK_PARTITIONS},
// {"PARTITIONS", TK_PARTITIONS},
{"TOPIC", TK_TOPIC},
{"TOPICS", TK_TOPICS},
{"COMPACT", TK_COMPACT},
......
......@@ -23,20 +23,20 @@
namespace {
void generateTestT1(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("root.test", "t1", TSDB_NORMAL_TABLE, 3)
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 3)
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20);
builder.done();
}
void generateTestST1(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("root.test", "st1", TSDB_SUPER_TABLE, 3, 2)
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 2)
.setPrecision(TSDB_TIME_PRECISION_MILLI).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
.addTag("tag1", TSDB_DATA_TYPE_INT).addTag("tag2", TSDB_DATA_TYPE_BINARY, 20)
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20);
builder.done();
mcs->createSubTable("root.test", "st1", "st1s1", 1);
mcs->createSubTable("root.test", "st1", "st1s2", 2);
mcs->createSubTable("test", "st1", "st1s1", 1);
mcs->createSubTable("test", "st1", "st1s2", 2);
}
}
......
......@@ -94,9 +94,9 @@ public:
return 0;
}
int32_t catalogGetTableMeta(const char* pDBName, const char* pTableName, STableMeta** pTableMeta) const {
int32_t catalogGetTableMeta(const char* pDbFullName, const char* pTableName, STableMeta** pTableMeta) const {
std::unique_ptr<STableMeta> table;
int32_t code = copyTableSchemaMeta(pDBName, pTableName, &table);
int32_t code = copyTableSchemaMeta(toDbname(pDbFullName), pTableName, &table);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -104,7 +104,7 @@ public:
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetTableHashVgroup(const char* pDBName, const char* pTableName, SVgroupInfo* vgInfo) const {
int32_t catalogGetTableHashVgroup(const char* pDbFullName, const char* pTableName, SVgroupInfo* vgInfo) const {
// todo
return 0;
}
......@@ -195,6 +195,14 @@ private:
typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache;
typedef std::map<std::string, TableMetaCache> DbMetaCache;
std::string toDbname(const std::string& dbFullName) const {
std::string::size_type n = dbFullName.find(".");
if (n == std::string::npos) {
return dbFullName;
}
return dbFullName.substr(n + 1);
}
std::string ttToString(int8_t tableType) const {
switch (tableType) {
case TSDB_SUPER_TABLE:
......
......@@ -714,12 +714,9 @@ TEST(testCase, show_user_Test) {
SSqlInfo info1 = doGenerateAST(sql1);
ASSERT_EQ(info1.valid, true);
void* output = NULL;
int32_t type = 0;
int32_t len = 0;
SDclStmtInfo output;
SParseBasicCtx ct= {.db= "abc", .acctId = 1, .requestId = 1};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, &len, &type, msg, buf.len);
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
ASSERT_EQ(code, 0);
// convert the show command to be the select query
......@@ -738,12 +735,9 @@ TEST(testCase, create_user_Test) {
ASSERT_EQ(info1.valid, true);
ASSERT_EQ(isDclSqlStatement(&info1), true);
void* output = NULL;
int32_t type = 0;
int32_t len = 0;
SDclStmtInfo output;
SParseBasicCtx ct= {.db= "abc", .acctId = 1, .requestId = 1};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, &len, &type, msg, buf.len);
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len);
ASSERT_EQ(code, 0);
destroySqlInfo(&info1);
......
......@@ -845,7 +845,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
return TSDB_CODE_FAILED;
}
*str = cJSON_Print(json);
*len = strlen(*str);
*len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS;
}
......
......@@ -33,10 +33,6 @@ protected:
void pushScan(const string& db, const string& table, int32_t scanOp) {
shared_ptr<MockTableMeta> meta = mockCatalogService->getTableMeta(db, table);
EXPECT_TRUE(meta);
// typedef struct SQueryPlanNode {
// SArray *pExpr; // the query functions or sql aggregations
// int32_t numOfExpr; // number of result columns, which is also the number of pExprs
// } SQueryPlanNode;
unique_ptr<SQueryPlanNode> scan((SQueryPlanNode*)calloc(1, sizeof(SQueryPlanNode)));
scan->info.type = scanOp;
scan->numOfCols = meta->schema->tableInfo.numOfColumns;
......@@ -54,6 +50,21 @@ protected:
return code;
}
int32_t run(const string& db, const string& sql) {
SParseContext cxt;
buildParseContext(db, sql, &cxt);
SQueryNode* query;
int32_t code = qParseQuerySql(&cxt, &query);
if (TSDB_CODE_SUCCESS != code) {
cout << "error no:" << code << ", msg:" << cxt.pMsg << endl;
return code;
}
SQueryDag* dag = nullptr;
code = qCreateQueryDag(query, nullptr, &dag);
dag_.reset(dag);
return code;
}
void explain() {
size_t level = taosArrayGetSize(dag_->pSubplans);
for (size_t i = 0; i < level; ++i) {
......@@ -64,7 +75,8 @@ protected:
std::cout << "no " << j << ":" << std::endl;
int32_t len = 0;
char* str = nullptr;
ASSERT_EQ (TSDB_CODE_SUCCESS, qSubPlanToString((const SSubplan*)taosArrayGetP(subplans, j), &str, &len));
ASSERT_EQ(TSDB_CODE_SUCCESS, qSubPlanToString((const SSubplan*)taosArrayGetP(subplans, j), &str, &len));
std::cout << "len:" << len << std::endl;
std::cout << str << std::endl;
free(str);
}
......@@ -108,6 +120,25 @@ private:
return info;
}
void buildParseContext(const string& db, const string& sql, SParseContext* pCxt) {
static string _db;
static string _sql;
static const int32_t _msgMaxLen = 4096;
static char _msg[_msgMaxLen];
_db = db;
_sql = sql;
memset(_msg, 0, _msgMaxLen);
pCxt->ctx.acctId = 1;
pCxt->ctx.db = _db.c_str();
pCxt->ctx.requestId = 1;
pCxt->pSql = _sql.c_str();
pCxt->sqlLen = _sql.length();
pCxt->pMsg = _msg;
pCxt->msgLen = _msgMaxLen;
}
shared_ptr<MockTableMeta> meta_;
unique_ptr<SQueryPlanNode> logicPlan_;
unique_ptr<SQueryDag> dag_;
......@@ -115,7 +146,7 @@ private:
// select * from table
TEST_F(PhyPlanTest, tableScanTest) {
pushScan("root.test", "t1", QNODE_TABLESCAN);
pushScan("test", "t1", QNODE_TABLESCAN);
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = reslut();
......@@ -124,9 +155,17 @@ TEST_F(PhyPlanTest, tableScanTest) {
// select * from supertable
TEST_F(PhyPlanTest, superTableScanTest) {
pushScan("root.test", "st1", QNODE_TABLESCAN);
pushScan("test", "st1", QNODE_TABLESCAN);
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = reslut();
// todo check
}
// insert into t values(...)
TEST_F(PhyPlanTest, insertTest) {
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
explain();
SQueryDag* dag = reslut();
// todo check
}
......@@ -263,87 +263,12 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
}
void msgInit() {
void initQueryModuleMsgHandle() {
queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg;
queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg;
queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp;
/*
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;
tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAG_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
*/
}
......
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include "tlockfree.h"
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
......@@ -30,36 +32,63 @@ enum {
QW_READY_RESPONSED,
};
typedef struct SQWorkerTaskStatus {
int8_t status;
int8_t ready;
enum {
QW_TASK_INFO_STATUS = 1,
QW_TASK_INFO_READY,
};
enum {
QW_READ = 1,
QW_WRITE,
};
typedef struct SQWorkerTaskStatus {
SRWLatch lock;
int32_t code;
int8_t status;
int8_t ready;
bool cancel;
bool drop;
} SQWorkerTaskStatus;
typedef struct SQWorkerResCache {
SRWLatch lock;
void *data;
} SQWorkerResCache;
typedef struct SQWorkerSchTaskStatus {
typedef struct SQWorkerSchStatus {
int32_t lastAccessTs; // timestamp in second
SHashObj *taskStatus; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchTaskStatus;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchStatus;
// Qnode/Vnode level task management
typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
SHashObj *scheduleHash; //key: schedulerId, value: SQWorkerSchTaskStatus
SRWLatch schLock;
SRWLatch resLock;
SHashObj *schHash; //key: schedulerId, value: SQWorkerSchStatus
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
} SQWorkerMgmt;
#define QW_TASK_DONE(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == status == JOB_TASK_STATUS_CANCELLED)
#define QW_GOT_RES_DATA(data) (false)
#define QW_LOW_RES_DATA(data) (false)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0)
#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0)
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -296,44 +296,45 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
int32_t len = strlen(task->msg);
msgSize = sizeof(SSchedulerQueryMsg) + len;
msgSize = sizeof(SSubQueryMsg) + len + 1;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchedulerQueryMsg *pMsg = msg;
SSubQueryMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(len);
memcpy(pMsg->msg, task->msg, len);
pMsg->msg[len] = 0;
break;
}
case TSDB_MSG_TYPE_RES_READY: {
msgSize = sizeof(SSchedulerReadyMsg);
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchedulerReadyMsg *pMsg = msg;
SResReadyMsg *pMsg = msg;
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TSDB_MSG_TYPE_FETCH: {
msgSize = sizeof(SSchedulerFetchMsg);
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchedulerFetchMsg *pMsg = msg;
SResFetchMsg *pMsg = msg;
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
......
......@@ -323,7 +323,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
......
......@@ -291,7 +291,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
return pHashObj->enableUpdate ? 0 : -1;
return pHashObj->enableUpdate ? 0 : -2;
}
}
......
......@@ -32,6 +32,7 @@
#include "ttokendef.h"
#include "tutil.h"
#include "tvariant.h"
#include "parserInt.h"
}
%syntax_error {
......@@ -302,8 +303,8 @@ db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z,
%type topic_optr {SCreateDbInfo}
topic_optr(Y) ::= db_optr(Z). { Y = Z; Y.dbType = TSDB_DB_TYPE_TOPIC; }
topic_optr(Y) ::= topic_optr(Z) partitions(X). { Y = Z; Y.partitions = strtol(X.z, NULL, 10); }
//topic_optr(Y) ::= db_optr(Z). { Y = Z; Y.dbType = TSDB_DB_TYPE_TOPIC; }
//topic_optr(Y) ::= topic_optr(Z) partitions(X). { Y = Z; Y.partitions = strtol(X.z, NULL, 10); }
%type alter_db_optr {SCreateDbInfo}
alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y); Y.dbType = TSDB_DB_TYPE_DEFAULT;}
......@@ -325,7 +326,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = str
alter_topic_optr(Y) ::= alter_db_optr(Z). { Y = Z; Y.dbType = TSDB_DB_TYPE_TOPIC; }
alter_topic_optr(Y) ::= alter_topic_optr(Z) partitions(X). { Y = Z; Y.partitions = strtol(X.z, NULL, 10); }
%type typename {TAOS_FIELD}
%type typename {SField}
typename(A) ::= ids(X). {
X.type = 0;
tSetColumnType (&A, &X);
......@@ -425,11 +426,11 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
setCreatedTableName(pInfo, &V, &U);
}
%type column{TAOS_FIELD}
%type column{SField}
%type columnlist{SArray*}
%destructor columnlist {taosArrayDestroy($$);}
columnlist(A) ::= columnlist(X) COMMA column(Y). {taosArrayPush(X, &Y); A = X; }
columnlist(A) ::= column(X). {A = taosArrayInit(4, sizeof(TAOS_FIELD)); taosArrayPush(A, &X);}
columnlist(A) ::= column(X). {A = taosArrayInit(4, sizeof(SField)); taosArrayPush(A, &X);}
// The information used for a column is the name and type of column:
// tinyint smallint int bigint float double bool timestamp binary(x) nchar(x)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册