提交 0991ca8e 编写于 作者: S shenglian zhou

integrate parser from ganlin zhao

上级 1148a133
......@@ -825,304 +825,792 @@ clean_up:
//=========================================================================
typedef enum {
LP_ITEM_TAG,
LP_ITEM_FIELD
} LPItemKind;
bool is_timestamp(char *pVal, uint16_t len) {
if ((len == 1) && pVal[0] == '0') {
printf("Type is timestamp(%s)\n", pVal);
return true;
}
if (len < 2) {
return false;
}
if (pVal[len - 1] == 's') {
switch (pVal[len - 2]) {
case 'm':
case 'u':
case 'n':
break;
default:
if (isdigit(pVal[len - 2])) {
break;
} else {
return false;
}
}
printf("Type is timestamp\n");
return true;
}
return false;
}
typedef struct {
SStrToken keyToken;
SStrToken valueToken;
bool is_bool(char *pVal, uint16_t len, bool *b_val) {
if ((len == 1) &&
(pVal[len - 1] == 't' ||
pVal[len - 1] == 'T')) {
printf("Type is bool(%c)\n", pVal[len - 1]);
*b_val = true;
return true;
}
char key[TSDB_COL_NAME_LEN];
int8_t type;
int16_t length;
if ((len == 1) &&
(pVal[len - 1] == 'f' ||
pVal[len - 1] == 'F')) {
printf("Type is bool(%c)\n", pVal[len - 1]);
*b_val = false;
return true;
}
char* value;
}SLPItem;
if((len == 4) &&
(!strcmp(&pVal[len - 4], "true") ||
!strcmp(&pVal[len - 4], "True") ||
!strcmp(&pVal[len - 4], "TRUE"))) {
printf("Type is bool(%s)\n", &pVal[len - 4]);
*b_val = true;
return true;
}
if((len == 5) &&
(!strcmp(&pVal[len - 5], "false") ||
!strcmp(&pVal[len - 5], "False") ||
!strcmp(&pVal[len - 5], "FALSE"))) {
printf("Type is bool(%s)\n", &pVal[len - 5]);
*b_val = false;
return true;
}
return false;
}
typedef struct {
SStrToken measToken;
SStrToken tsToken;
bool is_binary(char *pVal, uint16_t len) {
//binary: "abc"
if (len < 2) {
return false;
}
//binary
if (pVal[0] == '"' && pVal[len - 1] == '"') {
printf("Type is binary(%s)\n", pVal);
return true;
}
return false;
}
char sTableName[TSDB_TABLE_NAME_LEN];
SArray* tags;
SArray* fields;
int64_t ts;
bool is_nchar(char *pVal, uint16_t len) {
//nchar: L"abc"
if (len < 3) {
return false;
}
if (pVal[0] == 'L' && pVal[1] == '"' && pVal[len - 1] == '"') {
printf("Type is nchar(%s)\n", pVal);
return true;
}
return false;
}
} SLPPoint;
bool is_tiny_int(char *pVal, uint16_t len) {
if (len <= 2) {
return false;
}
if (!strcmp(&pVal[len - 2], "i8")) {
printf("Type is int8(%s)\n", pVal);
return true;
}
return false;
}
typedef enum {
LP_MEASUREMENT,
LP_TAG_KEY,
LP_TAG_VALUE,
LP_FIELD_KEY,
LP_FIELD_VALUE
} LPPart;
int32_t scanToCommaOrSpace(SStrToken s, int32_t start, int32_t* index, LPPart part) {
for (int32_t i = start; i < s.n; ++i) {
if (s.z[i] == ',' || s.z[i] == ' ') {
*index = i;
return 0;
}
bool is_tiny_uint(char *pVal, uint16_t len) {
if (len <= 2) {
return false;
}
if (pVal[0] == '-') {
return false;
}
return -1;
if (!strcmp(&pVal[len - 2], "u8")) {
printf("Type is uint8(%s)\n", pVal);
return true;
}
return false;
}
int32_t scanToEqual(SStrToken s, int32_t start, int32_t* index) {
for (int32_t i = start; i < s.n; ++i) {
if (s.z[i] == '=') {
*index = i;
return 0;
}
bool is_small_int(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (!strcmp(&pVal[len - 3], "i16")) {
printf("Type is int16(%s)\n", pVal);
return true;
}
return -1;
return false;
}
int32_t setPointMeasurement(SLPPoint* point, SStrToken token) {
point->measToken = token;
if (point->measToken.n < TSDB_TABLE_NAME_LEN) {
strncpy(point->sTableName, point->measToken.z, point->measToken.n);
point->sTableName[point->measToken.n] = '\0';
bool is_small_uint(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
return 0;
if (pVal[0] == '-') {
return false;
}
if (strcmp(&pVal[len - 3], "u16") == 0) {
printf("Type is uint16(%s)\n", pVal);
return true;
}
return false;
}
int32_t setItemKey(SLPItem* item, SStrToken key, LPPart part) {
item->keyToken = key;
if (item->keyToken.n < TSDB_COL_NAME_LEN) {
strncpy(item->key, item->keyToken.z, item->keyToken.n);
item->key[item->keyToken.n] = '\0';
bool is_int(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
return 0;
if (strcmp(&pVal[len - 3], "i32") == 0) {
printf("Type is int32(%s)\n", pVal);
return true;
}
return false;
}
int32_t setItemValue(SLPItem* item, SStrToken value, LPPart part) {
item->valueToken = value;
return 0;
bool is_uint(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (strcmp(&pVal[len - 3], "u32") == 0) {
printf("Type is uint32(%s)\n", pVal);
return true;
}
return false;
}
int32_t parseItemValue(SLPItem* item, LPItemKind kind) {
char* sv = item->valueToken.z;
char* last = item->valueToken.z + item->valueToken.n - 1;
if (isdigit(sv[0]) || sv[0] == '-') {
if (*last == 'i') {
item->type = TSDB_DATA_TYPE_BIGINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int64_t*)(item->value) = strtoll(sv, &endptr, 10);
} else if (*last == 'u') {
item->type = TSDB_DATA_TYPE_UBIGINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(uint64_t*)(item->value) = (uint64_t)strtoull(sv, &endptr, 10);
} else if (*last == 'b') {
item->type = TSDB_DATA_TYPE_TINYINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int8_t*)(item->value) = (int8_t)strtoll(sv, &endptr, 10);
} else if (*last == 's') {
item->type = TSDB_DATA_TYPE_SMALLINT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int16_t*)(item->value) = (int16_t)strtoll(sv, &endptr, 10);
} else if (*last == 'w') {
item->type = TSDB_DATA_TYPE_INT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(int32_t*)(item->value) = (int32_t)strtoll(sv, &endptr, 10);
} else if (*last == 'f') {
item->type = TSDB_DATA_TYPE_FLOAT;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(float*)(item->value) = (float)strtold(sv, &endptr);
} else {
item->type = TSDB_DATA_TYPE_DOUBLE;
item->length = (int16_t)tDataTypes[item->type].bytes;
item->value = malloc(item->length);
char* endptr = NULL;
*(double*)(item->value) = strtold(sv, &endptr);
}
} else if ((sv[0] == 'L' && sv[1] =='"') || sv[0] == '"' ) {
if (sv[0] == 'L') {
item->type = TSDB_DATA_TYPE_NCHAR;
uint32_t bytes = item->valueToken.n - 3;
item->length = bytes;
item->value = malloc(bytes);
memcpy(item->value, sv+2, bytes);
} else if (sv[0]=='"'){
item->type = TSDB_DATA_TYPE_BINARY;
uint32_t bytes = item->valueToken.n - 2;
item->length = bytes;
item->value = malloc(bytes);
memcpy(item->value, sv+1, bytes);
}
} else if (sv[0] == 't' || sv[0] == 'f' || sv[0]=='T' || sv[0] == 'F') {
item->type = TSDB_DATA_TYPE_BOOL;
item->length = tDataTypes[item->type].bytes;
item->value = malloc(tDataTypes[item->type].bytes);
*(uint8_t*)(item->value) = tolower(sv[0])=='t' ? TSDB_TRUE : TSDB_FALSE;
bool is_big_int(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
return 0;
if (strcmp(&pVal[len - 3], "i64") == 0) {
printf("Type is int64(%s)\n", pVal);
return true;
}
return false;
}
int32_t compareLPItemKey(const void* p1, const void* p2) {
const SLPItem* t1 = p1;
const SLPItem* t2 = p2;
uint32_t min = (t1->keyToken.n < t2->keyToken.n) ? t1->keyToken.n : t2->keyToken.n;
int res = strncmp(t1->keyToken.z, t2->keyToken.z, min);
if (res != 0) {
return res;
} else {
return (int)(t1->keyToken.n) - (int)(t2->keyToken.n);
bool is_big_uint(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (strcmp(&pVal[len - 3], "u64") == 0) {
printf("Type is uint64(%s)\n", pVal);
return true;
}
return false;
}
int32_t setPointTimeStamp(SLPPoint* point, SStrToken tsToken) {
point->tsToken = tsToken;
return 0;
bool is_float(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (strcmp(&pVal[len - 3], "f32") == 0) {
printf("Type is float(%s)\n", pVal);
return true;
}
return false;
}
int32_t parsePointTime(SLPPoint* point) {
if (point->tsToken.n <= 0) {
point->ts = taosGetTimestampNs();
} else {
char* endptr = NULL;
point->ts = strtoll(point->tsToken.z, &endptr, 10);
char* last = point->tsToken.z + point->tsToken.n - 1;
if (*last == 's') {
point->ts *= (int64_t)1e9;
} else if (*last == 'a') {
point->ts *= (int64_t)1e6;
} else if (*last == 'u') {
point->ts *= (int64_t)1e3;
} else if (*last == 'b') {
point->ts *= 1;
bool is_double(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (strcmp(&pVal[len - 3], "f64") == 0) {
printf("Type is double(%s)\n", pVal);
return true;
}
return false;
}
bool is_valid_integer(char *str) {
char *c = str;
if (*c != '+' && *c != '-' && !isdigit(*c)) {
return false;
}
c++;
while (*c != '\0') {
if (!isdigit(*c)) {
return false;
}
c++;
}
return 0;
return true;
}
int32_t tscParseLine(SStrToken line, SLPPoint* point) {
int32_t pos = 0;
bool is_valid_float(char *str) {
char *c = str;
uint8_t has_dot, has_exp, has_sign;
has_dot = 0;
has_exp = 0;
has_sign = 0;
int32_t start = 0;
int32_t err = scanToCommaOrSpace(line, start, &pos, LP_MEASUREMENT);
if (err != 0) {
tscError("a");
return err;
if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
return false;
}
if (*c == '.' && isdigit(*(c + 1))) {
has_dot = 1;
}
c++;
while (*c != '\0') {
if (!isdigit(*c)) {
switch (*c) {
case '.': {
if (!has_dot && !has_exp && isdigit(*(c + 1))) {
has_dot = 1;
} else {
return false;
}
break;
}
case 'e':
case 'E': {
if (!has_exp && isdigit(*(c - 1)) &&
(isdigit(*(c + 1)) ||
*(c + 1) == '+' ||
*(c + 1) == '-')) {
has_exp = 1;
} else {
return false;
}
break;
}
case '+':
case '-': {
if (!has_sign && has_exp && isdigit(*(c + 1))) {
has_sign = 1;
} else {
return false;
}
break;
}
default: {
return false;
}
}
}
c++;
} //while
return true;
}
SStrToken measurement = {.z = line.z+start, .n = pos-start};
setPointMeasurement(point, measurement);
point->tags = taosArrayInit(64, sizeof(SLPItem));
start = pos;
while (line.z[start] == ',') {
SLPItem item;
bool taos_sml_timestamp_convert(TAOS_SML_KV *pVal, char *value,
uint16_t len) {
if (is_timestamp(value, len)) {
pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = calloc(pVal->length, 1);
int64_t val = (int64_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
return false;
}
//len does not include '\0' from value.
bool taos_sml_type_convert(TAOS_SML_KV *pVal, char *value,
uint16_t len) {
if (len <= 0) {
return false;
}
//bool
bool b_val;
if (is_bool(value, len, &b_val)) {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = calloc(pVal->length, 1);
memcpy(pVal->value, &b_val, pVal->length);
return true;
}
//binary
if (is_binary(value, len)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->length = len - 2;
pVal->value = calloc(pVal->length, 1);
//copy after "
memcpy(pVal->value, value + 1, pVal->length);
return true;
}
//nchar
if (is_nchar(value, len)) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length = len - 3;
pVal->value = calloc(pVal->length, 1);
//copy after L"
memcpy(pVal->value, value + 2, pVal->length);
return true;
}
//floating number
if (is_float(value, len)) {
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_float(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
float val = (float)strtold(value, NULL);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_double(value, len)) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_float(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
double val = (double)strtold(value, NULL);
memcpy(pVal->value, &val, pVal->length);
return true;
}
//integer number
if (is_tiny_int(value, len)) {
pVal->type = TSDB_DATA_TYPE_TINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 2] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int8_t val = (int8_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_tiny_uint(value, len)) {
pVal->type = TSDB_DATA_TYPE_UTINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 2] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint8_t val = (uint8_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_small_int(value, len)) {
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int16_t val = (int16_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_small_uint(value, len)) {
pVal->type = TSDB_DATA_TYPE_USMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint16_t val = (uint16_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
//memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_int(value, len)) {
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int32_t val = (int32_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_uint(value, len)) {
pVal->type = TSDB_DATA_TYPE_UINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint32_t val = (uint32_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_big_int(value, len)) {
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
int64_t val = (int64_t)strtoll(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
if (is_big_uint(value, len)) {
pVal->type = TSDB_DATA_TYPE_UBIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!is_valid_integer(value)) {
return false;
}
pVal->value = calloc(pVal->length, 1);
uint64_t val = (uint64_t)strtoul(value, NULL, 10);
memcpy(pVal->value, &val, pVal->length);
return true;
}
//TODO: handle default is float here
return false;
}
start++;
err = scanToEqual(line, start, &pos);
if (err != 0) {
tscError("b");
goto error;
/* Field Escape charaters
1: measurement Comma,Space
2: tag_key, tag_value, field_key Comma,Equal Sign,Space
3: field_value Double quote,Backslash
*/
void escape_special_char(uint8_t field, const char **pos) {
const char *cur = *pos;
if (*cur != '\\') {
return;
}
switch (field) {
case 1:
switch (*(cur + 1)) {
case ',':
case ' ':
cur++;
break;
default:
break;
}
break;
case 2:
switch (*(cur + 1)) {
case ',':
case ' ':
case '=':
cur++;
break;
default:
break;
}
break;
case 3:
switch (*(cur + 1)) {
case '"':
case '\\':
cur++;
break;
default:
break;
}
break;
default:
break;
}
*pos = cur;
}
bool taos_sml_parse_measurement(TAOS_SML_DATA_POINT *pSml, const char **index, uint8_t *has_tags) {
const char *cur = *index;
uint16_t len = 0;
pSml->stableName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (*cur == '_') {
printf("Measurement field cannnot start with \'_\'\n");
return false;
}
while (*cur != '\0') {
if (len > TSDB_TABLE_NAME_LEN) {
printf("Measurement field cannot exceeds 193 characters");
return false;
}
//first unescaped comma or space identifies measurement
//if space detected first, meaning no tag in the input
if (*cur == ',' && *(cur - 1) != '\\') {
*has_tags = 1;
printf("measurement:found comma\n");
break;
}
if (*cur == ' ' && *(cur - 1) != '\\') {
printf("measurement:found space\n");
break;
}
//Comma, Space, Backslash needs to be escaped if any
if (*cur == '\\') {
escape_special_char(1, &cur);
}
pSml->stableName[len] = *cur;
cur++;
len++;
}
pSml->stableName[len] = '\0';
*index = cur + 1;
printf("stable name:%s|len:%d\n", pSml->stableName, len);
return true;
}
SStrToken tagKey = {.z = line.z + start, .n = pos-start};
setItemKey(&item, tagKey, LP_TAG_KEY);
start = pos + 1;
err = scanToCommaOrSpace(line, start, &pos, LP_TAG_VALUE);
if (err != 0) {
tscError("c");
goto error;
bool taos_sml_parse_key(TAOS_SML_KV *pKV, const char **index) {
const char *cur = *index;
char key[TSDB_COL_NAME_LEN];
uint16_t len = 0;
//key field cannot start with '_'
if (*cur == '_') {
printf("Tag key cannnot start with \'_\'\n");
return false;
}
//TODO: If tag key has ID field, use corresponding
//tag value as child table name
while (*cur != '\0') {
if (len > TSDB_COL_NAME_LEN) {
printf("Key field cannot exceeds 65 characters");
return false;
}
//unescaped '=' identifies a tag key
if (*cur == '=' && *(cur - 1) != '\\') {
printf("key: found equal sign\n");
break;
}
//Escape special character
if (*cur == '\\') {
escape_special_char(2, &cur);
}
key[len] = *cur;
cur++;
len++;
}
key[len] = '\0';
SStrToken tagValue = {.z = line.z + start, .n = pos-start};
setItemValue(&item, tagValue, LP_TAG_VALUE);
pKV->key = calloc(len + 1, 1);
memcpy(pKV->key, key, len + 1);
printf("key:%s|len:%d\n", pKV->key, len);
*index = cur + 1;
return true;
}
parseItemValue(&item, LP_ITEM_TAG);
taosArrayPush(point->tags, &item);
bool taos_sml_parse_value(TAOS_SML_KV *pKV, const char **index,
bool *is_last_kv) {
const char *start, *cur;
char *value = NULL;
uint16_t len = 0;
start = cur = *index;
while (1) {
// unescaped ',' or ' ' or '\0' identifies a value
if ((*cur == ',' || *cur == ' ' || *cur == '\0') && *(cur - 1) != '\\') {
value = calloc(len + 1, 1);
memcpy(value, start, len);
value[len] = '\0';
if (!taos_sml_type_convert(pKV, value, len)) {
free(value);
return false;
}
//unescaped ' ' or '\0' indicates end of value
*is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
break;
}
//Escape special character
if (*cur == '\\') {
escape_special_char(2, &cur);
}
cur++;
len++;
}
start = pos;
if (value) {
free(value);
}
taosArraySort(point->tags, compareLPItemKey);
*index = (*cur == '\0') ? cur : cur + 1;
return true;
}
point->fields = taosArrayInit(64, sizeof(SLPItem));
bool taos_sml_parse_kv_pairs(TAOS_SML_KV **pKVs, int *num_kvs, const char **index, bool isField) {
const char *cur = *index;
TAOS_SML_KV *pkv;
bool is_last_kv = false;
start++;
do {
SLPItem item;
if (isField) {
//leave space for timestamp
*pKVs = calloc(2, sizeof(TAOS_SML_KV));
pkv = *pKVs;
pkv++;
}
else {
*pKVs = calloc(1, sizeof(TAOS_SML_KV));
pkv = *pKVs;
}
err = scanToEqual(line, start, &pos);
if (err != 0) {
while (*cur != '\0') {
if (!taos_sml_parse_key(pkv, &cur)) {
printf("Unable to parse key field\n");
goto error;
}
if (!taos_sml_parse_value(pkv, &cur, &is_last_kv)) {
printf("Unable to parse value field\n");
goto error;
}
SStrToken fieldKey = {.z = line.z + start, .n = pos- start};
setItemKey(&item, fieldKey, LP_FIELD_KEY);
*num_kvs += 1;
start = pos + 1;
err = scanToCommaOrSpace(line, start, &pos, LP_FIELD_VALUE);
if (err != 0) {
if(is_last_kv) {
printf("last key value field detected\n");
goto done;
}
//reallocate addtional memory for more kvs
TAOS_SML_KV *more_kvs = NULL;
if (isField) {
more_kvs = realloc(*pKVs, (*num_kvs + 2) * sizeof(TAOS_SML_KV));
} else {
more_kvs = realloc(*pKVs, (*num_kvs + 1) * sizeof(TAOS_SML_KV));
}
if (!more_kvs) {
goto error;
}
SStrToken fieldValue = {.z = line.z + start, .n = pos - start};
setItemValue(&item, fieldValue, LP_TAG_VALUE);
*pKVs = more_kvs;
//move pKV points to next TAOS_SML_KV block
if (isField) {
pkv = *pKVs + *num_kvs + 1;
} else {
pkv = *pKVs + *num_kvs;
}
}
goto done;
parseItemValue(&item, LP_ITEM_FIELD);
taosArrayPush(point->fields, &item);
error:
free(*pKVs);
return false;
done:
*index = cur;
return true;
}
start = pos + 1;
} while (line.z[pos] == ',');
bool taos_sml_parse_timestamp(TAOS_SML_KV **pTS, const char **index) {
const char *start, *cur;
int len = 0;
char key[] = "_ts";
char *value = NULL;
taosArraySort(point->fields, compareLPItemKey);
start = cur = *index;
*pTS = calloc(1, sizeof(TAOS_SML_KV));
SStrToken tsToken = {.z = line.z+start, .n = line.n-start};
setPointTimeStamp(point, tsToken);
parsePointTime(point);
if (*cur == '\0') {
//no timestamp given, use current system time
return true;
}
goto done;
while(*cur != '\0') {
cur++;
len++;
}
value = calloc(len, 1);
memcpy(value, start, len);
if (!taos_sml_timestamp_convert(*pTS, value, len)) {
free(*pTS);
return false;
}
free(value);
error:
// free array
return err;
done:
return 0;
(*pTS)->key = calloc(sizeof(key), 1);
memcpy((*pTS)->key, key, sizeof(key));
return true;
}
bool tscParseLine(const char* sql, TAOS_SML_DATA_POINT* sml_data) {
const char* index = sql;
uint8_t has_tags = 0;
TAOS_SML_KV *timestamp = NULL;
if (!taos_sml_parse_measurement(sml_data, &index, &has_tags)) {
printf("Unable to parse measurement\n");
free(sml_data->stableName);
free(sml_data);
return false;
}
printf("============Parse measurement finished, has_tags:%d===============\n", has_tags);
//Parse Tags
if (has_tags) {
if (!taos_sml_parse_kv_pairs(&sml_data->tags, &sml_data->tagNum, &index, false)) {
printf("Unable to parse tag\n");
//TODO free allocated fileds inside TAOS_SML_DATA_POINT first
return false;
}
} else {
//no tags given
}
printf("============Parse tags finished, num_tags:%d===============\n", sml_data->tagNum);
//Parse fields
if (!taos_sml_parse_kv_pairs(&sml_data->fields, &sml_data->fieldNum, &index, true)) {
printf("Unable to parse field\n");
//TODO free allocated fileds inside TAOS_SML_DATA_POINT first
return false;
}
printf("============Parse fields finished, num_fields:%d===============\n", sml_data->fieldNum);
//Parse timestamp
if (!taos_sml_parse_timestamp(&timestamp, &index)) {
printf("Unable to parse timestamp\n");
return false;
}
sml_data->fieldNum = sml_data->fieldNum + 1;
TAOS_SML_KV* tsField = sml_data->fields;
tsField->length = timestamp->length;
tsField->type = timestamp->type;
tsField->value = malloc(timestamp->length);
tsField->key = malloc(strlen(timestamp->key)+1);
memcpy(tsField->key, timestamp->key, strlen(timestamp->key)+1);
memcpy(tsField->value, timestamp->value, timestamp->length);
free(timestamp->key);
free(timestamp->value);
free(timestamp);
printf("============Parse timestamp finished===============\n");
return true;
}
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
for (int32_t i = 0; i < numLines; ++i) {
SStrToken tkLine = {.z = lines[i], .n = (uint32_t)strlen(lines[i])};
SLPPoint point;
tscParseLine(tkLine, &point);
TAOS_SML_DATA_POINT point = {0};
bool succ = tscParseLine(lines[i], &point);
if (!succ) {
tscError("data point line parse failed. line %d", i);
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
} else {
tscDebug("data point line parse success. line %d", i);
}
taosArrayPush(points, &point);
}
return 0;
}
void destroyLPPoint(void* p) {
SLPPoint* lpPoint = p;
for (int i=0; i<taosArrayGetSize(lpPoint->fields); ++i) {
SLPItem* item = taosArrayGet(lpPoint->fields, i);
free(item->value);
}
taosArrayDestroy(lpPoint->fields);
for (int i=0; i<taosArrayGetSize(lpPoint->tags); ++i) {
SLPItem* item = taosArrayGet(lpPoint->tags, i);
free(item->value);
}
taosArrayDestroy(lpPoint->tags);
}
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
for (int i=0; i<point->tagNum; ++i) {
......@@ -1141,78 +1629,26 @@ void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
int32_t code = 0;
SArray* lpPoints = taosArrayInit(numLines, sizeof(SLPPoint));
tscParseLines(lines, numLines, lpPoints, NULL);
size_t numPoints = taosArrayGetSize(lpPoints);
TAOS_SML_DATA_POINT* points = calloc(numPoints, sizeof(TAOS_SML_DATA_POINT));
for (int i = 0; i < numPoints; ++i) {
SLPPoint* lpPoint = taosArrayGet(lpPoints, i);
TAOS_SML_DATA_POINT* point = points+i;
point->stableName = calloc(1, strlen(lpPoint->sTableName)+1);
strncpy(point->stableName, lpPoint->sTableName, strlen(lpPoint->sTableName));
point->stableName[strlen(lpPoint->sTableName)] = '\0';
size_t lpTagSize = taosArrayGetSize(lpPoint->tags);
point->tags = calloc(lpTagSize, sizeof(TAOS_SML_KV));
point->tagNum = (int)lpTagSize;
for (int j=0; j<lpTagSize; ++j) {
SLPItem* lpTag = taosArrayGet(lpPoint->tags, j);
TAOS_SML_KV* tagKv = point->tags + j;
size_t kenLen = strlen(lpTag->key);
tagKv->key = calloc(1, kenLen+1);
strncpy(tagKv->key, lpTag->key, kenLen);
tagKv->key[kenLen] = '\0';
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
tagKv->type = lpTag->type;
tagKv->length = lpTag->length;
tagKv->value = malloc(tagKv->length);
memcpy(tagKv->value, lpTag->value, tagKv->length);
}
size_t lpFieldsSize = taosArrayGetSize(lpPoint->fields);
point->fields = calloc(lpFieldsSize + 1, sizeof(TAOS_SML_KV));
point->fieldNum = (int)(lpFieldsSize + 1);
TAOS_SML_KV* tsField = point->fields + 0;
char tsKey[256];
snprintf(tsKey, 256, "_%s_ts", point->stableName);
size_t tsKeyLen = strlen(tsKey);
tsField->key = calloc(1, tsKeyLen+1);
strncpy(tsField->key, tsKey, tsKeyLen);
tsField->key[tsKeyLen] = '\0';
tsField->type = TSDB_DATA_TYPE_TIMESTAMP;
tsField->length = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
tsField->value = malloc(tsField->length);
memcpy(tsField->value, &(lpPoint->ts), tsField->length);
for (int j=0; j<lpFieldsSize; ++j) {
SLPItem* lpField = taosArrayGet(lpPoint->fields, j);
TAOS_SML_KV* fieldKv = point->fields + j + 1;
size_t kenLen = strlen(lpField->key);
fieldKv->key = calloc(1, kenLen+1);
strncpy(fieldKv->key, lpField->key, kenLen);
fieldKv->key[kenLen] = '\0';
fieldKv->type = lpField->type;
fieldKv->length = lpField->length;
fieldKv->value = malloc(fieldKv->length);
memcpy(fieldKv->value, lpField->value, fieldKv->length);
}
code = tscParseLines(lines, numLines, lpPoints, NULL);
if (code != 0) {
goto cleanup;
}
size_t numPoints = taosArrayGetSize(lpPoints);
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = taos_sml_insert(taos, points, (int)numPoints);
if (code != 0) {
tscError("taos_sml_insert error: %s", tstrerror((code)));
}
cleanup:
for (int i=0; i<numPoints; ++i) {
destroySmlDataPoint(points+i);
}
free(points);
taosArrayDestroyEx(lpPoints, destroyLPPoint);
taosArrayDestroy(lpPoints);
return code;
}
......@@ -100,6 +100,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217) //"Database not specified or available")
#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) //"Table does not exist")
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config")
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x0220) //"Syntax error in Line")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
......
......@@ -964,22 +964,22 @@ int32_t verify_schema_less(TAOS* taos) {
usleep(100000);
char* lines[] = {
"st,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000",
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5 1626006833640000000",
"ste,t2=5,t3=L\"ste\" c1=true,c2=4,c3=\"iam\" 1626056811823316532",
"st,t1=4i,t2=5,t3=\"t4\" c1=3i,c3=L\"passitagain\",c2=true,c4=5 1626006833642000000",
"ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532",
"ste,t2=5,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32b,c6=64s,c7=32w,c8=88.88f 1626056812843316532",
"st,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000",
"stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5,c6=7u 1626006933640000000",
"stf,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin_stf\",c2=false,c5=5,c6=7u 1626006933641a"
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns",
"ste,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1626056811823316532ns",
"st,t1=4i64,t2=5f64,t3=\"t4\" c1=3i64,c3=L\"passitagain\",c2=true,c4=5f64 1626006833642000000ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false 1626056811843316532ns",
"ste,t2=5f64,t3=L\"ste2\" c3=\"iamszhou\",c4=false,c5=32i8,c6=64i16,c7=32i32,c8=88.88f32 1626056812843316532ns",
"st,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns",
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
};
int code = 0;
code = taos_insert_lines(taos, lines , sizeof(lines)/sizeof(char*));
char* lines2[] = {
"stg,t1=3i,t2=4,t3=\"t3\" c1=3i,c3=L\"passit\",c2=false,c4=4 1626006833639000000",
"stg,t1=4i,t3=\"t4\",t2=5,t4=5 c1=3i,c3=L\"passitagin\",c2=true,c4=5,c5=5 1626006833640000000"
"stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
};
code = taos_insert_lines(taos, &lines2[0], 1);
code = taos_insert_lines(taos, &lines2[1], 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册