提交 990205d6 编写于 作者: wmmhello's avatar wmmhello

refactor: schemaless tmp commit

上级 9eff7ff4
......@@ -241,6 +241,15 @@ typedef struct SExprInfo {
struct tExprNode* pExpr;
} SExprInfo;
typedef struct {
const char* key;
int32_t keyLen;
uint8_t type;
int16_t length;
const char* value;
int32_t valueLen;
} SSmlKv;
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
......
......@@ -92,10 +92,14 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum);
int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields);
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen);
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen);
void* tscSmlInitHandle(SQuery *pQuery);
void tscSmlDestroyHandle(void *pHandle);
int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
#ifdef __cplusplus
}
......
......@@ -86,9 +86,13 @@ extern const int32_t TYPE_BYTES[15];
#define TS_PATH_DELIMITER "."
#define TS_ESCAPE_CHAR '`'
#define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_HOURS 3
#define TSDB_TIME_PRECISION_MINUTES 4
#define TSDB_TIME_PRECISION_SECONDS 5
#define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us"
......
......@@ -13,30 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCPARSELINE_H
#define TDENGINE_TSCPARSELINE_H
#ifndef TDENGINE_CLIENTSML_H
#define TDENGINE_CLIENTSML_H
#ifdef __cplusplus
extern "C" {
#endif
#include "thash.h"
#include "clientint.h"
#define SML_TIMESTAMP_SECOND_DIGITS 10
#define SML_TIMESTAMP_MILLI_SECOND_DIGITS 13
#include "clientInt.h"
#include "catalog.h"
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
typedef struct {
const char* key;
int32_t keyLen;
uint8_t type;
int16_t length;
const char* value;
int32_t valueLen;
} TAOS_SML_KV;
typedef struct {
const char* measure;
const char* tags;
......@@ -50,65 +39,66 @@ typedef struct {
} TAOS_PARSE_ELEMENTS;
typedef struct {
char* childTableName;
const char *sTableName; // super table name
uint8_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
SArray* tags;
SArray *cols;
} TAOS_SML_DATA_POINT_TAGS;
typedef struct SSmlSTableMeta {
char *sTableName; // super table name
uint8_t sTableNameLen;
// char *sTableName; // super table name
// uint8_t sTableNameLen;
uint8_t precision; // the number of precision
SHashObj* tagHash;
SHashObj* fieldHash;
} SSmlSTableMeta;
typedef enum {
SML_TIME_STAMP_NOT_CONFIGURED,
SML_TIME_STAMP_HOURS,
SML_TIME_STAMP_MINUTES,
SML_TIME_STAMP_SECONDS,
SML_TIME_STAMP_MILLI_SECONDS,
SML_TIME_STAMP_MICRO_SECONDS,
SML_TIME_STAMP_NANO_SECONDS,
SML_TIME_STAMP_NOW
} SMLTimeStampType;
typedef struct {
uint64_t id;
STscObj* taos;
SCatalog* pCatalog;
SMLProtocolType protocol;
SMLTimeStampType tsType;
int32_t affectedRows;
int32_t tsType;
SHashObj* childTables;
SHashObj* superTables;
SHashObj* metaHashObj;
SHashObj* pVgHash;
void* exec;
STscObj* taos;
SCatalog* pCatalog;
SRequestObj* pRequest;
SQuery* pQuery;
int32_t affectedRows;
char *msgBuf;
int16_t msgLen;
} SSmlLinesInfo;
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info);
int smlInsert(TAOS* taos, SSmlLinesInfo* info);
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
bool isValidInteger(char *str);
bool isValidFloat(char *str);
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info);
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
bool convertSmlValueType(SSmlKv *pVal, char *value,
uint16_t len, SSmlLinesInfo* info, bool isTag);
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
int32_t convertSmlTimeStamp(SSmlKv *pVal, char *value,
uint16_t len, SSmlLinesInfo* info);
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point);
int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol,
int sml_insert_lines(TAOS* taos, SRequestObj* request, char* lines[], int numLines, SMLProtocolType protocol,
SMLTimeStampType tsType);
int sml_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol,
int sml_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
......@@ -116,4 +106,4 @@ int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol
}
#endif
#endif // TDENGINE_TSCPARSELINE_H
#endif // TDENGINE_CLIENTSML_H
aux_source_directory(src SCHEMALESS_SRC)
add_library(schemaless STATIC ${SCHEMALESS_SRC})
target_include_directories(
schemaless
PUBLIC "${TD_SOURCE_DIR}/include/libs/schemaless"
PRIVATE "inc"
)
target_link_libraries(
schemaless
PUBLIC os util common catalog qcom
)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
//#include <ctype.h>
//#include <stdio.h>
//#include <stdlib.h>
//#include <string.h>
//
//#include "cJSON.h"
//#include "hash.h"
//#include "taos.h"
//
//#include "tscUtil.h"
//#include "tsclient.h"
//#include "tscLog.h"
//
//#include "tscParseLine.h"
//
//#define OTD_MAX_FIELDS_NUM 2
//#define OTD_JSON_SUB_FIELDS_NUM 2
//#define OTD_JSON_FIELDS_NUM 4
//
//#define OTD_TIMESTAMP_COLUMN_NAME "ts"
//#define OTD_METRIC_VALUE_COLUMN_NAME "value"
//
///* telnet style API parser */
//static uint64_t HandleId = 0;
//
//static uint64_t genUID() {
// uint64_t id;
//
// do {
// id = atomic_add_fetch_64(&HandleId, 1);
// } while (id == 0);
//
// return id;
//}
//
//static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) {
// const char *cur = *index;
// uint16_t len = 0;
//
// pSml->stableName = tcalloc(TSDB_TABLE_NAME_LEN + TS_BACKQUOTE_CHAR_SIZE, 1);
// if (pSml->stableName == NULL) {
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
// /*
// if (isdigit(*cur)) {
// tscError("OTD:0x%"PRIx64" Metric cannot start with digit", info->id);
// tfree(pSml->stableName);
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// }
// */
//
// while (*cur != '\0') {
// if (len > TSDB_TABLE_NAME_LEN - 1) {
// tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
// tfree(pSml->stableName);
// return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
// }
//
// if (*cur == ' ') {
// if (*(cur + 1) != ' ') {
// break;
// } else {
// cur++;
// continue;
// }
// }
//
// pSml->stableName[len] = *cur;
//
// cur++;
// len++;
// }
// if (len == 0 || *cur == '\0') {
// tfree(pSml->stableName);
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// }
//
// addEscapeCharToString(pSml->stableName, len);
// *index = cur + 1;
// tscDebug("OTD:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len);
//
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **index, SSmlLinesInfo* info) {
// //Timestamp must be the first KV to parse
// assert(*num_kvs == 0);
//
// const char *start, *cur;
// int32_t ret = TSDB_CODE_SUCCESS;
// int len = 0;
// char key[] = OTD_TIMESTAMP_COLUMN_NAME;
// char *value = NULL;
//
// start = cur = *index;
// //allocate fields for timestamp and value
// *pTS = tcalloc(OTD_MAX_FIELDS_NUM, sizeof(TAOS_SML_KV));
//
// while(*cur != '\0') {
// if (*cur == ' ') {
// if (*(cur + 1) != ' ') {
// break;
// } else {
// cur++;
// continue;
// }
// }
// cur++;
// len++;
// }
//
// if (len > 0 && *cur != '\0') {
// value = tcalloc(len + 1, 1);
// memcpy(value, start, len);
// } else {
// tfree(*pTS);
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// }
//
// ret = convertSmlTimeStamp(*pTS, value, len, info);
// if (ret) {
// tfree(value);
// tfree(*pTS);
// return ret;
// }
// tfree(value);
//
// (*pTS)->key = tcalloc(sizeof(key) + TS_BACKQUOTE_CHAR_SIZE, 1);
// memcpy((*pTS)->key, key, sizeof(key));
// addEscapeCharToString((*pTS)->key, (int32_t)strlen(key));
//
// *num_kvs += 1;
// *index = cur + 1;
//
// return ret;
//}
//
//static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **index, SSmlLinesInfo* info) {
// //skip timestamp
// TAOS_SML_KV *pVal = *pKVs + 1;
// const char *start, *cur;
// int32_t ret = TSDB_CODE_SUCCESS;
// int len = 0;
// bool searchQuote = false;
// char key[] = OTD_METRIC_VALUE_COLUMN_NAME;
// char *value = NULL;
//
// start = cur = *index;
//
// //if metric value is string
// if (*cur == '"') {
// searchQuote = true;
// cur += 1;
// len += 1;
// } else if (*cur == 'L' && *(cur + 1) == '"') {
// searchQuote = true;
// cur += 2;
// len += 2;
// }
//
// while(*cur != '\0') {
// if (*cur == ' ') {
// if (searchQuote == true) {
// if (*(cur - 1) == '"' && len != 1 && len != 2) {
// searchQuote = false;
// } else {
// cur++;
// len++;
// continue;
// }
// }
//
// if (*(cur + 1) != ' ') {
// break;
// } else {
// cur++;
// continue;
// }
// }
// cur++;
// len++;
// }
//
// if (len > 0 && *cur != '\0') {
// value = tcalloc(len + 1, 1);
// memcpy(value, start, len);
// } else {
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// }
//
// if (!convertSmlValueType(pVal, value, len, info, false)) {
// tscError("OTD:0x%"PRIx64" Failed to convert metric value string(%s) to any type",
// info->id, value);
// tfree(value);
// return TSDB_CODE_TSC_INVALID_VALUE;
// }
// tfree(value);
//
// pVal->key = tcalloc(sizeof(key) + TS_BACKQUOTE_CHAR_SIZE, 1);
// memcpy(pVal->key, key, sizeof(key));
// addEscapeCharToString(pVal->key, (int32_t)strlen(pVal->key));
// *num_kvs += 1;
//
// *index = cur + 1;
// return ret;
//}
//
//static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
// const char *cur = *index;
// char key[TSDB_COL_NAME_LEN];
// uint16_t len = 0;
//
// //key field cannot start with digit
// //if (isdigit(*cur)) {
// // tscError("OTD:0x%"PRIx64" Tag key cannot start with digit", info->id);
// // return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// //}
// while (*cur != '\0') {
// if (len > TSDB_COL_NAME_LEN - 1) {
// tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
// return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
// }
// if (*cur == ' ') {
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// }
// if (*cur == '=') {
// break;
// }
//
// key[len] = *cur;
// cur++;
// len++;
// }
// if (len == 0 || *cur == '\0') {
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// }
// key[len] = '\0';
//
// if (checkDuplicateKey(key, pHash, info)) {
// return TSDB_CODE_TSC_DUP_TAG_NAMES;
// }
//
// pKV->key = tcalloc(len + TS_BACKQUOTE_CHAR_SIZE + 1, 1);
// memcpy(pKV->key, key, len + 1);
// addEscapeCharToString(pKV->key, len);
// //tscDebug("OTD:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
// *index = cur + 1;
// return TSDB_CODE_SUCCESS;
//}
//
//
//static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
// bool *is_last_kv, SSmlLinesInfo* info) {
// const char *start, *cur;
// char *value = NULL;
// uint16_t len = 0;
// start = cur = *index;
//
// while (1) {
// // whitespace or '\0' identifies a value
// if (*cur == ' ' || *cur == '\0') {
// // '\0' indicates end of value
// *is_last_kv = (*cur == '\0') ? true : false;
// if (*cur == ' ' && *(cur + 1) == ' ') {
// cur++;
// continue;
// } else {
// break;
// }
// }
// cur++;
// len++;
// }
//
// if (len == 0) {
// tfree(pKV->key);
// return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
// }
//
// value = tcalloc(len + 1, 1);
// memcpy(value, start, len);
// value[len] = '\0';
// if (!convertSmlValueType(pKV, value, len, info, true)) {
// tscError("OTD:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
// info->id, value);
// //free previous alocated key field
// tfree(pKV->key);
// tfree(value);
// return TSDB_CODE_TSC_INVALID_VALUE;
// }
// tfree(value);
//
// *index = (*cur == '\0') ? cur : cur + 1;
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
// const char **index, char **childTableName,
// SHashObj *pHash, SSmlLinesInfo* info) {
// const char *cur = *index;
// int32_t ret = TSDB_CODE_SUCCESS;
// TAOS_SML_KV *pkv;
// bool is_last_kv = false;
//
// int32_t capacity = 4;
// *pKVs = tcalloc(capacity, sizeof(TAOS_SML_KV));
// pkv = *pKVs;
//
// size_t childTableNameLen = strlen(tsSmlChildTableName);
// char childTbName[TSDB_TABLE_NAME_LEN + TS_BACKQUOTE_CHAR_SIZE] = {0};
// if (childTableNameLen != 0) {
// memcpy(childTbName, tsSmlChildTableName, childTableNameLen);
// addEscapeCharToString(childTbName, (int32_t)(childTableNameLen));
// }
// while (*cur != '\0') {
// ret = parseTelnetTagKey(pkv, &cur, pHash, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse key", info->id);
// return ret;
// }
// ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse value", info->id);
// return ret;
// }
// if (childTableNameLen != 0 && strcasecmp(pkv->key, childTbName) == 0) {
// *childTableName = tcalloc(pkv->length + TS_BACKQUOTE_CHAR_SIZE + 1, 1);
// memcpy(*childTableName, pkv->value, pkv->length);
// (*childTableName)[pkv->length] = '\0';
// addEscapeCharToString(*childTableName, pkv->length);
// tfree(pkv->key);
// tfree(pkv->value);
// } else {
// *num_kvs += 1;
// }
//
// if (is_last_kv) {
// break;
// }
//
// //reallocate addtional memory for more kvs
// if ((*num_kvs + 1) > capacity) {
// TAOS_SML_KV *more_kvs = NULL;
// capacity *= 3; capacity /= 2;
// more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
// if (!more_kvs) {
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
// *pKVs = more_kvs;
// }
//
// //move pKV points to next TAOS_SML_KV block
// pkv = *pKVs + *num_kvs;
// }
//
// return ret;
//}
//
//static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
// const char* index = line;
// int32_t ret = TSDB_CODE_SUCCESS;
//
// //Parse metric
// ret = parseTelnetMetric(smlData, &index, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse metric", info->id);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse metric finished", info->id);
//
// //Parse timestamp
// ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &index, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse timestamp", info->id);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse timestamp finished", info->id);
//
// //Parse value
// ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &index, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse metric value", info->id);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse metric value finished", info->id);
//
// //Parse tagKVs
// SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
// ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &index, &smlData->childTableName, keyHashTable, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse tags", info->id);
// taosHashCleanup(keyHashTable);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse tags finished", info->id);
// taosHashCleanup(keyHashTable);
//
//
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
// for (int32_t i = 0; i < numLines; ++i) {
// TAOS_SML_DATA_POINT point = {0};
// int32_t code = tscParseTelnetLine(lines[i], &point, info);
// if (code != TSDB_CODE_SUCCESS) {
// tscError("OTD:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
// destroySmlDataPoint(&point);
// return code;
// } else {
// tscDebug("OTD:0x%"PRIx64" data point line parse success. line %d", info->id, i);
// }
//
// taosArrayPush(points, &point);
// }
// return TSDB_CODE_SUCCESS;
//}
//
//int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) {
// int32_t code = 0;
//
// SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
// info->id = genUID();
// info->tsType = tsType;
// info->protocol = protocol;
//
// if (numLines <= 0 || numLines > 65536) {
// tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
// tfree(info);
// code = TSDB_CODE_TSC_APP_ERROR;
// return code;
// }
//
// for (int i = 0; i < numLines; ++i) {
// if (lines[i] == NULL) {
// tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i);
// tfree(info);
// code = TSDB_CODE_TSC_APP_ERROR;
// return code;
// }
// }
//
// SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
// if (lpPoints == NULL) {
// tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines failed to allocate memory", info->id);
// tfree(info);
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
//
// tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
// code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info);
// size_t numPoints = taosArrayGetSize(lpPoints);
//
// if (code != 0) {
// goto cleanup;
// }
//
// TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
// code = tscSmlInsert(taos, points, (int)numPoints, info);
// if (code != 0) {
// tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines error: %s", info->id, tstrerror((code)));
// }
// if (affectedRows != NULL) {
// *affectedRows = info->affectedRows;
// }
//
//cleanup:
// tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines finish inserting %d lines. code: %d", info->id, numLines, code);
// points = TARRAY_GET_START(lpPoints);
// numPoints = taosArrayGetSize(lpPoints);
// for (int i = 0; i < numPoints; ++i) {
// destroySmlDataPoint(points+i);
// }
//
// taosArrayDestroy(&lpPoints);
//
// tfree(info);
// return code;
//}
//
//int taos_telnet_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
// SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
// info->id = genUID();
// int code = tscSmlInsert(taos, points, numPoint, info);
// tfree(info);
// return code;
//}
//
//
///* telnet style API parser */
//static int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
// cJSON *metric = cJSON_GetObjectItem(root, "metric");
// if (!cJSON_IsString(metric)) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// size_t stableLen = strlen(metric->valuestring);
// if (stableLen > TSDB_TABLE_NAME_LEN - 1) {
// tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters in JSON", info->id, TSDB_TABLE_NAME_LEN - 1);
// return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
// }
//
// pSml->stableName = tcalloc(stableLen + TS_BACKQUOTE_CHAR_SIZE + 1, sizeof(char));
// if (pSml->stableName == NULL){
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
//
// /*
// if (isdigit(metric->valuestring[0])) {
// tscError("OTD:0x%"PRIx64" Metric cannot start with digit in JSON", info->id);
// tfree(pSml->stableName);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
// */
//
// tstrncpy(pSml->stableName, metric->valuestring, stableLen + 1);
// addEscapeCharToString(pSml->stableName, (int32_t)stableLen);
//
// return TSDB_CODE_SUCCESS;
//
//}
//
//static int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesInfo* info) {
// int32_t size = cJSON_GetArraySize(root);
// if (size != OTD_JSON_SUB_FIELDS_NUM) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// cJSON *value = cJSON_GetObjectItem(root, "value");
// if (!cJSON_IsNumber(value)) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// cJSON *type = cJSON_GetObjectItem(root, "type");
// if (!cJSON_IsString(type)) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// *tsVal = strtoll(value->numberstring, NULL, 10);
// //if timestamp value is 0 use current system time
// if (*tsVal == 0) {
// *tsVal = taosGetTimestampNs();
// return TSDB_CODE_SUCCESS;
// }
//
// size_t typeLen = strlen(type->valuestring);
// if (typeLen == 1 && type->valuestring[0] == 's') {
// //seconds
// *tsVal = (int64_t)(*tsVal * 1e9);
// } else if (typeLen == 2 && type->valuestring[1] == 's') {
// switch (type->valuestring[0]) {
// case 'm':
// //milliseconds
// *tsVal = convertTimePrecision(*tsVal, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
// break;
// case 'u':
// //microseconds
// *tsVal = convertTimePrecision(*tsVal, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
// break;
// case 'n':
// //nanoseconds
// *tsVal = *tsVal * 1;
// break;
// default:
// return TSDB_CODE_TSC_INVALID_JSON;
// }
// } else {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSmlLinesInfo* info) {
// //Timestamp must be the first KV to parse
// assert(*num_kvs == 0);
// int64_t tsVal;
// char key[] = OTD_TIMESTAMP_COLUMN_NAME;
//
// cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
// if (cJSON_IsNumber(timestamp)) {
// //timestamp value 0 indicates current system time
// if (timestamp->valueint == 0) {
// tsVal = taosGetTimestampNs();
// } else {
// tsVal = strtoll(timestamp->numberstring, NULL, 10);
// size_t tsLen = strlen(timestamp->numberstring);
// if (tsLen == SML_TIMESTAMP_SECOND_DIGITS) {
// tsVal = (int64_t)(tsVal * 1e9);
// } else if (tsLen == SML_TIMESTAMP_MILLI_SECOND_DIGITS) {
// tsVal = convertTimePrecision(tsVal, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
// } else {
// return TSDB_CODE_TSC_INVALID_TIME_STAMP;
// }
// }
// } else if (cJSON_IsObject(timestamp)) {
// int32_t ret = parseTimestampFromJSONObj(timestamp, &tsVal, info);
// if (ret != TSDB_CODE_SUCCESS) {
// tscError("OTD:0x%"PRIx64" Failed to parse timestamp from JSON Obj", info->id);
// return ret;
// }
// } else {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// //allocate fields for timestamp and value
// *pTS = tcalloc(OTD_MAX_FIELDS_NUM, sizeof(TAOS_SML_KV));
//
//
// (*pTS)->key = tcalloc(sizeof(key), 1);
// memcpy((*pTS)->key, key, sizeof(key));
//
// (*pTS)->type = TSDB_DATA_TYPE_TIMESTAMP;
// (*pTS)->length = (int16_t)tDataTypes[(*pTS)->type].bytes;
// (*pTS)->value = tcalloc((*pTS)->length, 1);
// memcpy((*pTS)->value, &tsVal, (*pTS)->length);
//
// *num_kvs += 1;
// return TSDB_CODE_SUCCESS;
//
//}
//
//static int32_t convertJSONBool(TAOS_SML_KV *pVal, char* typeStr, int64_t valueInt, SSmlLinesInfo* info) {
// if (strcasecmp(typeStr, "bool") != 0) {
// tscError("OTD:0x%"PRIx64" invalid type(%s) for JSON Bool", info->id, typeStr);
// return TSDB_CODE_TSC_INVALID_JSON_TYPE;
// }
// pVal->type = TSDB_DATA_TYPE_BOOL;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(bool *)(pVal->value) = valueInt ? true : false;
//
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t convertJSONNumber(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
// //tinyint
// if (strcasecmp(typeStr, "i8") == 0 ||
// strcasecmp(typeStr, "tinyint") == 0) {
// if (!IS_VALID_TINYINT(value->valueint)) {
// tscError("OTD:0x%"PRIx64" JSON value(%"PRId64") cannot fit in type(tinyint)", info->id, value->valueint);
// return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
// }
// pVal->type = TSDB_DATA_TYPE_TINYINT;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(int8_t *)(pVal->value) = (int8_t)(value->valueint);
// return TSDB_CODE_SUCCESS;
// }
// //smallint
// if (strcasecmp(typeStr, "i16") == 0 ||
// strcasecmp(typeStr, "smallint") == 0) {
// if (!IS_VALID_SMALLINT(value->valueint)) {
// tscError("OTD:0x%"PRIx64" JSON value(%"PRId64") cannot fit in type(smallint)", info->id, value->valueint);
// return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
// }
// pVal->type = TSDB_DATA_TYPE_SMALLINT;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(int16_t *)(pVal->value) = (int16_t)(value->valueint);
// return TSDB_CODE_SUCCESS;
// }
// //int
// if (strcasecmp(typeStr, "i32") == 0 ||
// strcasecmp(typeStr, "int") == 0) {
// if (!IS_VALID_INT(value->valueint)) {
// tscError("OTD:0x%"PRIx64" JSON value(%"PRId64") cannot fit in type(int)", info->id, value->valueint);
// return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
// }
// pVal->type = TSDB_DATA_TYPE_INT;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(int32_t *)(pVal->value) = (int32_t)(value->valueint);
// return TSDB_CODE_SUCCESS;
// }
// //bigint
// if (strcasecmp(typeStr, "i64") == 0 ||
// strcasecmp(typeStr, "bigint") == 0) {
// pVal->type = TSDB_DATA_TYPE_BIGINT;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// /* cJSON conversion of legit BIGINT may overflow,
// * use original string to do the conversion.
// */
// errno = 0;
// int64_t val = (int64_t)strtoll(value->numberstring, NULL, 10);
// if (errno == ERANGE || !IS_VALID_BIGINT(val)) {
// tscError("OTD:0x%"PRIx64" JSON value(%s) cannot fit in type(bigint)", info->id, value->numberstring);
// return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
// }
// *(int64_t *)(pVal->value) = val;
// return TSDB_CODE_SUCCESS;
// }
// //float
// if (strcasecmp(typeStr, "f32") == 0 ||
// strcasecmp(typeStr, "float") == 0) {
// if (!IS_VALID_FLOAT(value->valuedouble)) {
// tscError("OTD:0x%"PRIx64" JSON value(%f) cannot fit in type(float)", info->id, value->valuedouble);
// return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
// }
// pVal->type = TSDB_DATA_TYPE_FLOAT;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(float *)(pVal->value) = (float)(value->valuedouble);
// return TSDB_CODE_SUCCESS;
// }
// //double
// if (strcasecmp(typeStr, "f64") == 0 ||
// strcasecmp(typeStr, "double") == 0) {
// if (!IS_VALID_DOUBLE(value->valuedouble)) {
// tscError("OTD:0x%"PRIx64" JSON value(%f) cannot fit in type(double)", info->id, value->valuedouble);
// return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
// }
// pVal->type = TSDB_DATA_TYPE_DOUBLE;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(double *)(pVal->value) = (double)(value->valuedouble);
// return TSDB_CODE_SUCCESS;
// }
//
// //if reach here means type is unsupported
// tscError("OTD:0x%"PRIx64" invalid type(%s) for JSON Number", info->id, typeStr);
// return TSDB_CODE_TSC_INVALID_JSON_TYPE;
//}
//
//static int32_t convertJSONString(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
// if (strcasecmp(typeStr, "binary") == 0) {
// pVal->type = TSDB_DATA_TYPE_BINARY;
// } else if (strcasecmp(typeStr, "nchar") == 0) {
// pVal->type = TSDB_DATA_TYPE_NCHAR;
// } else {
// tscError("OTD:0x%"PRIx64" invalid type(%s) for JSON String", info->id, typeStr);
// return TSDB_CODE_TSC_INVALID_JSON_TYPE;
// }
// pVal->length = (int16_t)strlen(value->valuestring);
// pVal->value = tcalloc(pVal->length + 1, 1);
// memcpy(pVal->value, value->valuestring, pVal->length);
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t parseValueFromJSONObj(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
// int32_t ret = TSDB_CODE_SUCCESS;
// int32_t size = cJSON_GetArraySize(root);
//
// if (size != OTD_JSON_SUB_FIELDS_NUM) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// cJSON *value = cJSON_GetObjectItem(root, "value");
// if (value == NULL) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// cJSON *type = cJSON_GetObjectItem(root, "type");
// if (!cJSON_IsString(type)) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// switch (value->type) {
// case cJSON_True:
// case cJSON_False: {
// ret = convertJSONBool(pVal, type->valuestring, value->valueint, info);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// break;
// }
// case cJSON_Number: {
// ret = convertJSONNumber(pVal, type->valuestring, value, info);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// break;
// }
// case cJSON_String: {
// ret = convertJSONString(pVal, type->valuestring, value, info);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// break;
// }
// default:
// return TSDB_CODE_TSC_INVALID_JSON_TYPE;
// }
//
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
// int type = root->type;
//
// switch (type) {
// case cJSON_True:
// case cJSON_False: {
// pVal->type = TSDB_DATA_TYPE_BOOL;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(bool *)(pVal->value) = root->valueint ? true : false;
// break;
// }
// case cJSON_Number: {
// //convert default JSON Number type to BIGINT/DOUBLE
// //if (isValidInteger(root->numberstring)) {
// // pVal->type = TSDB_DATA_TYPE_BIGINT;
// // pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// // pVal->value = tcalloc(pVal->length, 1);
// // /* cJSON conversion of legit BIGINT may overflow,
// // * use original string to do the conversion.
// // */
// // errno = 0;
// // int64_t val = (int64_t)strtoll(root->numberstring, NULL, 10);
// // if (errno == ERANGE || !IS_VALID_BIGINT(val)) {
// // tscError("OTD:0x%"PRIx64" JSON value(%s) cannot fit in type(bigint)", info->id, root->numberstring);
// // return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
// // }
// // *(int64_t *)(pVal->value) = val;
// //} else if (isValidFloat(root->numberstring)) {
// // pVal->type = TSDB_DATA_TYPE_DOUBLE;
// // pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// // pVal->value = tcalloc(pVal->length, 1);
// // *(double *)(pVal->value) = (double)(root->valuedouble);
// //} else {
// // return TSDB_CODE_TSC_INVALID_JSON_TYPE;
// //}
// if (isValidInteger(root->numberstring) || isValidFloat(root->numberstring)) {
// pVal->type = TSDB_DATA_TYPE_DOUBLE;
// pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
// pVal->value = tcalloc(pVal->length, 1);
// *(double *)(pVal->value) = (double)(root->valuedouble);
// }
//
// break;
// }
// case cJSON_String: {
// /* set default JSON type to binary/nchar according to
// * user configured parameter tsDefaultJSONStrType
// */
// if (strcasecmp(tsDefaultJSONStrType, "binary") == 0) {
// pVal->type = TSDB_DATA_TYPE_BINARY;
// } else if (strcasecmp(tsDefaultJSONStrType, "nchar") == 0) {
// pVal->type = TSDB_DATA_TYPE_NCHAR;
// } else {
// tscError("OTD:0x%"PRIx64" Invalid default JSON string type set from config %s", info->id, tsDefaultJSONStrType);
// return TSDB_CODE_TSC_INVALID_JSON_CONFIG;
// }
// //pVal->length = wcslen((wchar_t *)root->valuestring) * TSDB_NCHAR_SIZE;
// pVal->length = (int16_t)strlen(root->valuestring);
// pVal->value = tcalloc(pVal->length + 1, 1);
// memcpy(pVal->value, root->valuestring, pVal->length);
// break;
// }
// case cJSON_Object: {
// int32_t ret = parseValueFromJSONObj(root, pVal, info);
// if (ret != TSDB_CODE_SUCCESS) {
// tscError("OTD:0x%"PRIx64" Failed to parse timestamp from JSON Obj", info->id);
// return ret;
// }
// break;
// }
// default:
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, SSmlLinesInfo* info) {
// //skip timestamp
// TAOS_SML_KV *pVal = *pKVs + 1;
// char key[] = OTD_METRIC_VALUE_COLUMN_NAME;
//
// cJSON *metricVal = cJSON_GetObjectItem(root, "value");
// if (metricVal == NULL) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// int32_t ret = parseValueFromJSON(metricVal, pVal, info);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
//
// pVal->key = tcalloc(sizeof(key) + TS_BACKQUOTE_CHAR_SIZE, 1);
// memcpy(pVal->key, key, sizeof(key));
// addEscapeCharToString(pVal->key, (int32_t)strlen(pVal->key));
//
// *num_kvs += 1;
// return TSDB_CODE_SUCCESS;
//
//}
//
//
//static int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **childTableName,
// SHashObj *pHash, SSmlLinesInfo* info) {
// int32_t ret = TSDB_CODE_SUCCESS;
//
// cJSON *tags = cJSON_GetObjectItem(root, "tags");
// if (tags == NULL || tags->type != cJSON_Object) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// //handle child table name
// size_t childTableNameLen = strlen(tsSmlChildTableName);
// char childTbName[TSDB_TABLE_NAME_LEN] = {0};
// if (childTableNameLen != 0) {
// memcpy(childTbName, tsSmlChildTableName, childTableNameLen);
// cJSON *id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// if (!cJSON_IsString(id)) {
// tscError("OTD:0x%"PRIx64" ID must be JSON string", info->id);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
// size_t idLen = strlen(id->valuestring);
// *childTableName = tcalloc(idLen + TS_BACKQUOTE_CHAR_SIZE + 1, sizeof(char));
// memcpy(*childTableName, id->valuestring, idLen);
// addEscapeCharToString(*childTableName, (int32_t)idLen);
//
// //check duplicate IDs
// cJSON_DeleteItemFromObject(tags, childTbName);
// id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// return TSDB_CODE_TSC_DUP_TAG_NAMES;
// }
// }
// }
//
// int32_t tagNum = cJSON_GetArraySize(tags);
// //at least one tag pair required
// if (tagNum <= 0) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// //allocate memory for tags
// *pKVs = tcalloc(tagNum, sizeof(TAOS_SML_KV));
// TAOS_SML_KV *pkv = *pKVs;
//
// for (int32_t i = 0; i < tagNum; ++i) {
// cJSON *tag = cJSON_GetArrayItem(tags, i);
// if (tag == NULL) {
// return TSDB_CODE_TSC_INVALID_JSON;
// }
// //check duplicate keys
// if (checkDuplicateKey(tag->string, pHash, info)) {
// return TSDB_CODE_TSC_DUP_TAG_NAMES;
// }
// //key
// size_t keyLen = strlen(tag->string);
// if (keyLen > TSDB_COL_NAME_LEN - 1) {
// tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters in JSON", info->id, TSDB_COL_NAME_LEN - 1);
// return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
// }
// pkv->key = tcalloc(keyLen + TS_BACKQUOTE_CHAR_SIZE + 1, sizeof(char));
// strncpy(pkv->key, tag->string, keyLen);
// addEscapeCharToString(pkv->key, (int32_t)keyLen);
// //value
// ret = parseValueFromJSON(tag, pkv, info);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// *num_kvs += 1;
// pkv++;
//
// }
//
// return ret;
//
//}
//
//static int32_t tscParseJSONPayload(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
// int32_t ret = TSDB_CODE_SUCCESS;
//
// if (!cJSON_IsObject(root)) {
// tscError("OTD:0x%"PRIx64" data point needs to be JSON object", info->id);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// int32_t size = cJSON_GetArraySize(root);
// //outmost json fields has to be exactly 4
// if (size != OTD_JSON_FIELDS_NUM) {
// tscError("OTD:0x%"PRIx64" Invalid number of JSON fields in data point %d", info->id, size);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// //Parse metric
// ret = parseMetricFromJSON(root, pSml, info);
// if (ret != TSDB_CODE_SUCCESS) {
// tscError("OTD:0x%"PRIx64" Unable to parse metric from JSON payload", info->id);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse metric from JSON payload finished", info->id);
//
// //Parse timestamp
// ret = parseTimestampFromJSON(root, &pSml->fields, &pSml->fieldNum, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse timestamp from JSON payload", info->id);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse timestamp from JSON payload finished", info->id);
//
// //Parse metric value
// ret = parseMetricValueFromJSON(root, &pSml->fields, &pSml->fieldNum, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse metric value from JSON payload", info->id);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id);
//
// //Parse tags
// SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
// ret = parseTagsFromJSON(root, &pSml->tags, &pSml->tagNum, &pSml->childTableName, keyHashTable, info);
// if (ret) {
// tscError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id);
// taosHashCleanup(keyHashTable);
// return ret;
// }
// tscDebug("OTD:0x%"PRIx64" Parse tags from JSON payload finished", info->id);
// taosHashCleanup(keyHashTable);
//
// return TSDB_CODE_SUCCESS;
//}
//
//static int32_t tscParseMultiJSONPayload(char* payload, SArray* points, SSmlLinesInfo* info) {
// int32_t payloadNum, ret;
// ret = TSDB_CODE_SUCCESS;
//
// if (payload == NULL) {
// tscError("OTD:0x%"PRIx64" empty JSON Payload", info->id);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
//
// cJSON *root = cJSON_Parse(payload);
// //multiple data points must be sent in JSON array
// if (cJSON_IsObject(root)) {
// payloadNum = 1;
// } else if (cJSON_IsArray(root)) {
// payloadNum = cJSON_GetArraySize(root);
// } else {
// tscError("OTD:0x%"PRIx64" Invalid JSON Payload", info->id);
// ret = TSDB_CODE_TSC_INVALID_JSON;
// goto PARSE_JSON_OVER;
// }
//
// for (int32_t i = 0; i < payloadNum; ++i) {
// TAOS_SML_DATA_POINT point = {0};
// cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
//
// ret = tscParseJSONPayload(dataPoint, &point, info);
// if (ret != TSDB_CODE_SUCCESS) {
// tscError("OTD:0x%"PRIx64" JSON data point parse failed", info->id);
// destroySmlDataPoint(&point);
// goto PARSE_JSON_OVER;
// } else {
// tscDebug("OTD:0x%"PRIx64" JSON data point parse success", info->id);
// }
// taosArrayPush(points, &point);
// }
//
//PARSE_JSON_OVER:
// cJSON_Delete(root);
// return ret;
//}
//
//int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) {
// int32_t code = 0;
//
// SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
// info->id = genUID();
// info->tsType = tsType;
// info->protocol = protocol;
//
// if (payload == NULL) {
// tscError("OTD:0x%"PRIx64" taos_insert_json_payload payload is NULL", info->id);
// tfree(info);
// code = TSDB_CODE_TSC_APP_ERROR;
// return code;
// }
//
// SArray* lpPoints = taosArrayInit(1, sizeof(TAOS_SML_DATA_POINT));
// if (lpPoints == NULL) {
// tscError("OTD:0x%"PRIx64" taos_insert_json_payload failed to allocate memory", info->id);
// tfree(info);
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
//
// tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d points", info->id, 1);
// code = tscParseMultiJSONPayload(payload, lpPoints, info);
// size_t numPoints = taosArrayGetSize(lpPoints);
//
// if (code != 0) {
// goto cleanup;
// }
//
// TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
// code = tscSmlInsert(taos, points, (int)numPoints, info);
// if (code != 0) {
// tscError("OTD:0x%"PRIx64" taos_insert_json_payload error: %s", info->id, tstrerror((code)));
// }
// if (affectedRows != NULL) {
// *affectedRows = info->affectedRows;
// }
//
//cleanup:
// tscDebug("OTD:0x%"PRIx64" taos_insert_json_payload finish inserting 1 Point. code: %d", info->id, code);
// points = TARRAY_GET_START(lpPoints);
// numPoints = taosArrayGetSize(lpPoints);
// for (int i = 0; i < numPoints; ++i) {
// destroySmlDataPoint(points+i);
// }
//
// taosArrayDestroy(&lpPoints);
//
// tfree(info);
// return code;
//}
......@@ -3,7 +3,7 @@
#include <stdlib.h>
#include <string.h>
#include "tscParseLine.h"
#include "clientSml.h"
#include "tdef.h"
#include "ttypes.h"
......@@ -29,6 +29,7 @@ typedef struct {
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'
#define tsMaxSQLStringLen (1024*1024)
//=================================================================================================
......@@ -71,8 +72,8 @@ typedef enum {
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SArray* tags; //SArray<SSchema>
SArray* fields; //SArray<SSchema>
SHashObj *tags;
SHashObj *fields;
} SCreateSTableActionInfo;
typedef struct {
......@@ -339,21 +340,22 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash
return 0;
}
static int32_t buildColumnDescription(SSchema* field,
static int32_t buildColumnDescription(TAOS_SML_KV* field,
char* buf, int32_t bufSize, int32_t* outBytes) {
uint8_t type = field->type;
char tname[TSDB_TABLE_NAME_LEN] = {0};
memcpy(tname, field->key, field->keyLen);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
int32_t bytes = field->bytes - VARSTR_HEADER_SIZE;
int32_t bytes = field->length - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
int out = snprintf(buf, bufSize,"%s %s(%d)",
field->name,tDataTypes[field->type].name, bytes);
tname,tDataTypes[field->type].name, bytes);
*outBytes = out;
} else {
int out = snprintf(buf, bufSize, "%s %s",
field->name, tDataTypes[type].name);
tname, tDataTypes[type].name);
*outBytes = out;
}
......@@ -364,7 +366,7 @@ static int32_t buildColumnDescription(SSchema* field,
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
int32_t code = 0;
int32_t outBytes = 0;
char *result = (char *)calloc(1, tsMaxSQLStringLen+1);
char *result = (char *)taosMemoryCalloc(1, tsMaxSQLStringLen+1);
int32_t capacity = tsMaxSQLStringLen + 1;
uDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
......@@ -374,7 +376,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
char* errStr = taos_errstr(res);
const char* errStr = taos_errstr(res);
char* begin = strstr(errStr, "duplicated column names");
bool tscDupColNames = (begin != NULL);
if (code != TSDB_CODE_SUCCESS) {
......@@ -382,7 +384,8 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
taos_free_result(res);
if (code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || code == TSDB_CODE_MND_TAG_ALREAY_EXIST || tscDupColNames) {
// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
......@@ -399,7 +402,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
result+n, capacity-n, &outBytes);
TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
char* errStr = taos_errstr(res);
const char* errStr = taos_errstr(res);
char* begin = strstr(errStr, "duplicated column names");
bool tscDupColNames = (begin != NULL);
if (code != TSDB_CODE_SUCCESS) {
......@@ -407,7 +410,8 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
taos_free_result(res);
if (code == TSDB_CODE_MND_TAG_ALREAY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
......@@ -429,7 +433,8 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
taos_free_result(res);
if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
// if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
......@@ -451,7 +456,8 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
taos_free_result(res);
if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
// if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
......@@ -465,24 +471,25 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
case SCHEMA_ACTION_CREATE_STABLE: {
int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
char* pos = result + n; int freeBytes = capacity - n;
size_t numCols = taosArrayGetSize(action->createSTable.fields);
for (int32_t i = 0; i < numCols; ++i) {
SSchema* field = taosArrayGet(action->createSTable.fields, i);
buildColumnDescription(field, pos, freeBytes, &outBytes);
TAOS_SML_KV **kv = taosHashIterate(action->createSTable.fields, NULL);
while(kv){
buildColumnDescription(*kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
kv = taosHashIterate(action->createSTable.fields, kv);
}
--pos; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ") tags (");
pos += outBytes; freeBytes -= outBytes;
size_t numTags = taosArrayGetSize(action->createSTable.tags);
for (int32_t i = 0; i < numTags; ++i) {
SSchema* field = taosArrayGet(action->createSTable.tags, i);
buildColumnDescription(field, pos, freeBytes, &outBytes);
kv = taosHashIterate(action->createSTable.tags, NULL);
while(kv){
buildColumnDescription(*kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
kv = taosHashIterate(action->createSTable.tags, kv);
}
pos--; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ")");
......@@ -493,7 +500,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
taos_free_result(res);
if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
......@@ -680,643 +687,79 @@ static int32_t loadTableSchemaFromDB(TAOS* taos, char* tableName, SSmlSTableSche
static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
int32_t code = 0;
size_t numStable = taosHashGetSize(info->superTables);
SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
while (tableMetaSml) {
SSmlSTableMeta* cTablePoints = *tableMetaSml;
if (NULL == pStmt->pCatalog) {
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
}
STableMeta *pTableMeta = NULL;
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
if (pTableMeta->uid == pStmt->bInfo.tbUid) {
pStmt->bInfo.needParse = false;
return TSDB_CODE_SUCCESS;
}
size_t superTableLen = 0;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, superTable, superTableLen);
// for (int i = 0; i < numStable; ++i) {
SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
SSmlSTableSchema dbSchema;
memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
schemaAction.createSTable.tags = pointSchema->tags;
schemaAction.createSTable.fields = pointSchema->fields;
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
schemaAction.createSTable.tags = cTablePoints->tagHash;
schemaAction.createSTable.fields = cTablePoints->fieldHash;
applySchemaAction(taos, &schemaAction, info);
code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
if (code != 0) {
uError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, pointSchema->sTableName);
return code;
}
}
if (code == TSDB_CODE_SUCCESS) {
pointSchema->precision = dbSchema.precision;
size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);
SHashObj* dbTagHash = dbSchema.tagHash;
SHashObj* dbFieldHash = dbSchema.fieldHash;
for (int j = 0; j < pointTagSize; ++j) {
SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName,
&schemaAction, &actionNeeded, info);
if (actionNeeded) {
code = applySchemaAction(taos, &schemaAction, info);
if (code != 0) {
destroySmlSTableSchema(&dbSchema);
return code;
}
}
}
SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);
for (int j = 1; j < pointFieldSize; ++j) {
SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
SSchemaAction schemaAction = {0};
bool actionNeeded = false;
generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName,
&schemaAction, &actionNeeded, info);
if (actionNeeded) {
code = applySchemaAction(taos, &schemaAction, info);
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
if (code != 0) {
destroySmlSTableSchema(&dbSchema);
uError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
return code;
}
}
}
pointSchema->precision = dbSchema.precision;
destroySmlSTableSchema(&dbSchema);
}else if (code == TSDB_CODE_SUCCESS) {
} else {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
return code;
}
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
}
// }
return 0;
}
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numPoints; ++i) {
TAOS_SML_DATA_POINT * point = points + i;
SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* kv = point->tags + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t ts = *(int64_t*)(kv->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
*(int64_t*)(kv->value) = ts;
}
}
SArray* cTablePoints = NULL;
SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
if (pCTablePoints) {
cTablePoints = *pCTablePoints;
} else {
cTablePoints = taosArrayInit(64, sizeof(point));
taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
}
taosArrayPush(cTablePoints, &point);
}
return 0;
}
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
SArray* tagsSchema = sTableSchema->tags;
SArray* colsSchema = sTableSchema->fields;
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
char* sql = taosMemoryMalloc(tsMaxSQLStringLen + 1);
if (sql == NULL) {
uError("taosMemoryMalloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1;
int32_t totalLen = 0;
totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", tagSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, " tags (");
// for (int i = 0; i < numTags; ++i) {
// snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
// }
for (int i = 0; i < numTags; ++i) {
if (tagKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = tagKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "%s,", colSchema->name);
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ") values ");
TAOS_SML_KV** colKVs = taosMemoryMalloc(numCols * sizeof(TAOS_SML_KV*));
for (int r = 0; r < rows; ++r) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "(");
memset(colKVs, 0, numCols * sizeof(TAOS_SML_KV*));
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
for (int i = 0; i < point->fieldNum; ++i) {
TAOS_SML_KV* kv = point->fields + i;
colKVs[kv->fieldSchemaIdx] = kv;
}
for (int i = 0; i < numCols; ++i) {
if (colKVs[i] == NULL) {
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, "NULL,");
} else {
TAOS_SML_KV* kv = colKVs[i];
size_t beforeLen = totalLen;
int32_t len = 0;
converToStr(sql + beforeLen, kv->type, kv->value, kv->length, &len);
totalLen += len;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ",");
}
}
--totalLen;
totalLen += snprintf(sql + totalLen, freeBytes - totalLen, ")");
}
taosMemoryFree(colKVs);
sql[totalLen] = '\0';
uDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName,
sql);
bool tryAgain = false;
int32_t try = 0;
do {
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
uError("SML:0x%"PRIx64 " taos_query return %d:%s", info->id, code, taos_errstr(res));
}
uDebug("SML:0x%"PRIx64 " taos_query inserted %d rows", info->id, taos_affected_rows(res));
info->affectedRows += taos_affected_rows(res);
taos_free_result(res);
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " insert child table by sql. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
}
} while (tryAgain);
taosMemoryFree(sql);
return code;
}
static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(sTableSchema->tags);
size_t numCols = taosArrayGetSize(sTableSchema->fields);
size_t rows = taosArrayGetSize(cTablePoints);
TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
for (int i= 0; i < rows; ++i) {
TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
for (int j = 0; j < pDataPoint->tagNum; ++j) {
TAOS_SML_KV* kv = pDataPoint->tags + j;
tagKVs[kv->fieldSchemaIdx] = kv;
}
}
//tag bind
SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
taosArraySetSize(tagBinds, numTags);
int isNullColBind = TSDB_TRUE;
for (int j = 0; j < numTags; ++j) {
TAOS_BIND* bind = taosArrayGet(tagBinds, j);
bind->is_null = &isNullColBind;
}
for (int j = 0; j < numTags; ++j) {
if (tagKVs[j] == NULL) continue;
TAOS_SML_KV* kv = tagKVs[j];
TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
bind->buffer_type = kv->type;
bind->length = taosMemoryMalloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
//rows bind
SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
for (int i = 0; i < rows; ++i) {
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
if (colBinds == NULL) {
uError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
"num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
bind->is_null = &isNullColBind;
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* kv = point->fields + j;
TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
bind->buffer_type = kv->type;
bind->length = taosMemoryMalloc(sizeof(uintptr_t*));
*bind->length = kv->length;
bind->buffer = kv->value;
bind->is_null = NULL;
}
taosArrayPush(rowsBind, &colBinds);
}
int32_t code = 0;
code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info);
if (code != 0) {
uError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
}
//taosMemoryFree rows bind
for (int i = 0; i < rows; ++i) {
TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
for (int j = 0; j < numCols; ++j) {
TAOS_BIND* bind = colBinds + j;
taosMemoryFree(bind->length);
}
taosMemoryFree(colBinds);
}
taosArrayDestroy(&rowsBind);
//taosMemoryFree tag bind
for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
TAOS_BIND* bind = taosArrayGet(tagBinds, i);
taosMemoryFree(bind->length);
}
taosArrayDestroy(&tagBinds);
return code;
}
static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName,
SArray* tagsSchema, SArray* tagsBind,
SArray* colsSchema, SArray* rowsBind,
size_t rowSize, SSmlLinesInfo* info) {
size_t numTags = taosArrayGetSize(tagsSchema);
size_t numCols = taosArrayGetSize(colsSchema);
char* sql = taosMemoryMalloc(tsMaxSQLStringLen+1);
if (sql == NULL) {
uError("taosMemoryMalloc sql memory error");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t freeBytes = tsMaxSQLStringLen + 1 ;
sprintf(sql, "insert into ? using %s (", sTableName);
for (int i = 0; i < numTags; ++i) {
SSchema* tagSchema = taosArrayGet(tagsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
for (int i = 0; i < numTags; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") (");
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(colsSchema, i);
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
for (int i = 0; i < numCols; ++i) {
snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
}
snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
sql[strlen(sql)] = '\0';
uDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql);
size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 2 / 3;
size_t rows = taosArrayGetSize(rowsBind);
size_t batchSize = MIN(maxBatchSize, rows);
uDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
info->id, cTableName, rows, batchSize);
SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < rows;) {
int j = i;
for (; j < i + batchSize && j<rows; ++j) {
taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
}
if (j > i) {
uDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info);
if (code != 0) {
taosArrayDestroy(&batchBind);
tfree(sql);
return code;
}
taosArrayClear(batchBind);
}
i = j;
}
taosArrayDestroy(&batchBind);
tfree(sql);
return code;
}
static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind,
SSmlLinesInfo* info) {
int32_t code = 0;
TAOS_STMT* stmt = taos_stmt_init(taos);
if (stmt == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
if (code != 0) {
uError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt));
taos_stmt_close(stmt);
return code;
}
bool tryAgain = false;
int32_t try = 0;
do {
code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
if (code != 0) {
uError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
taosHashPut(info->metaHashObj, superTable, superTableLen, &pTableMeta, POINTER_BYTES);
size_t rows = taosArrayGetSize(batchBind);
for (int32_t i = 0; i < rows; ++i) {
TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i);
code = taos_stmt_bind_param(stmt, colsBinds);
if (code != 0) {
uError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
code = taos_stmt_add_batch(stmt);
if (code != 0) {
uError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt));
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
}
}
code = taos_stmt_execute(stmt);
if (code != 0) {
uError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try);
}
uDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt));
tryAgain = false;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
tryAgain = true;
}
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
int32_t code2 = taos_errno(res2);
if (code2 != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
if (tryAgain) {
taosMsleep(100 * (2 << try));
}
}
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (tryAgain) {
taosMsleep( 100 * (2 << try));
}
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
}
} while (tryAgain);
int affectedRows = taos_stmt_affected_rows(stmt);
info->affectedRows += affectedRows;
taos_stmt_close(stmt);
return code;
return 0;
}
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
size_t childTableDataPoints = taosArrayGetSize(cTablePoints);
if (childTableDataPoints < 10) {
code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
} else {
code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
}
return code;
}
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
static int32_t applyDataPoints(TAOS* taos, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* cTablePoints = *pCTablePoints;
TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashIterate(info->childTables, NULL);
while (oneTable) {
TAOS_SML_DATA_POINT_TAGS* tableData = *oneTable;
size_t rowSize = 0;
size_t numCols = taosArrayGetSize(sTableSchema->fields);
for (int i = 0; i < numCols; ++i) {
SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
rowSize += colSchema->bytes;
}
uDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu",
info->id, point->childTableName, point->stableName, rowSize);
code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info);
if (code != 0) {
uError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
goto cleanup;
}
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SVgroupInfo vg;
catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg);
taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
uDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
STableMeta** pMeta = taosHashGet(info->metaHashObj, tableData->sTableName, tableData->sTableNameLen);
ASSERT (NULL != pMeta && NULL != *pMeta);
(*pMeta)->vgId = vg.vgId;
(*pMeta)->uid = tableData->uid;
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
}
smlBind(info->exec, tableData->tags, tableData->cols, *pMeta, info->msgBuf, info->msgLen);
cleanup:
pCTablePoints = taosHashIterate(cname2points, NULL);
while (pCTablePoints) {
SArray* pPoints = *pCTablePoints;
taosArrayDestroy(&pPoints);
pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
oneTable = taosHashIterate(info->childTables, oneTable);
}
taosHashCleanup(cname2points);
return code;
}
static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
if (!point->childTableName) {
int tableNameLen = TSDB_TABLE_NAME_LEN;
point->childTableName = calloc(1, tableNameLen + 1);
getSmlMd5ChildTableName(point, point->childTableName, &tableNameLen, info);
point->childTableName[tableNameLen] = '\0';
}
smlBuildOutput(info->exec, info->pVgHash);
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
if(info->pRequest->code != TSDB_CODE_SUCCESS){
STableMeta* tableMeta = NULL;
int32_t ret = getSuperTableMetaFromLocalCache(taos, point->stableName, &tableMeta, info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
uint8_t precision = tableMeta->tableInfo.precision;
taosMemoryFree(tableMeta);
char* sql = taosMemoryMalloc(TSDB_MAX_SQL_LEN + 1);
int freeBytes = TSDB_MAX_SQL_LEN;
int sqlLen = 0;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "insert into %s(", point->childTableName);
for (int col = 0; col < point->fieldNum; ++col) {
TAOS_SML_KV* kv = point->fields + col;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%s,", kv->key);
}
--sqlLen;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ") values (");
TAOS_SML_KV* tsField = point->fields + 0;
int64_t ts = *(int64_t*)(tsField->value);
ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, precision);
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%" PRId64 ",", ts);
for (int col = 1; col < point->fieldNum; ++col) {
TAOS_SML_KV* kv = point->fields + col;
int32_t len = 0;
converToStr(sql + sqlLen, kv->type, kv->value, kv->length, &len);
sqlLen += len;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ",");
}
--sqlLen;
sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ")");
sql[sqlLen] = 0;
uDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id,
point->childTableName, point->stableName, sql);
TAOS_RES* res = taos_query(taos, sql);
taosMemoryFree(sql);
code = taos_errno(res);
info->affectedRows = taos_affected_rows(res);
taos_free_result(res);
info->affectedRows = taos_affected_rows(info->pRequest);
return code;
}
......@@ -1333,18 +776,12 @@ int tscSmlInsert(TAOS* taos, SSmlLinesInfo* info) {
}
uDebug("SML:0x%"PRIx64" apply data points", info->id);
code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
code = applyDataPoints(taos, info);
if (code != 0) {
uError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
}
clean_up:
for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
taosArrayDestroy(&schema->fields);
taosArrayDestroy(&schema->tags);
}
taosArrayDestroy(&stableSchemas);
return code;
}
......@@ -1398,16 +835,6 @@ static void escapeSpecialCharacter(uint8_t field, const char **pos) {
*pos = cur;
}
char* addEscapeCharToString(char *str, int32_t len) {
if (str == NULL) {
return NULL;
}
memmove(str + 1, str, len);
str[0] = str[len + 1] = TS_BACKQUOTE_CHAR;
str[len + 2] = '\0';
return str;
}
bool isValidInteger(char *str) {
char *c = str;
if (*c != '+' && *c != '-' && !isdigit(*c)) {
......@@ -1668,67 +1095,6 @@ static bool isNchar(char *pVal, uint16_t len) {
return false;
}
static int32_t isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType, SSmlLinesInfo* info) {
if (len == 0) {
return TSDB_CODE_SUCCESS;
}
if ((len == 1) && pVal[0] == '0') {
*tsType = SML_TIME_STAMP_NOW;
return TSDB_CODE_SUCCESS;
}
for (int i = 0; i < len; ++i) {
if(!isdigit(pVal[i])) {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
/* For InfluxDB line protocol use user passed timestamp precision
* For OpenTSDB protocols only 10 digit(seconds) or 13 digits(milliseconds)
* precision allowed
*/
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
if (info->tsType != SML_TIME_STAMP_NOT_CONFIGURED) {
*tsType = info->tsType;
} else {
*tsType = SML_TIME_STAMP_NANO_SECONDS;
}
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
if (len == SML_TIMESTAMP_SECOND_DIGITS) {
*tsType = SML_TIME_STAMP_SECONDS;
} else if (len == SML_TIMESTAMP_MILLI_SECOND_DIGITS) {
*tsType = SML_TIME_STAMP_MILLI_SECONDS;
} else {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
return TSDB_CODE_SUCCESS;
//if (pVal[len - 1] == 's') {
// switch (pVal[len - 2]) {
// case 'm':
// *tsType = SML_TIME_STAMP_MILLI_SECONDS;
// break;
// case 'u':
// *tsType = SML_TIME_STAMP_MICRO_SECONDS;
// break;
// case 'n':
// *tsType = SML_TIME_STAMP_NANO_SECONDS;
// break;
// default:
// if (isdigit(pVal[len - 2])) {
// *tsType = SML_TIME_STAMP_SECONDS;
// break;
// } else {
// return false;
// }
// }
// //printf("Type is timestamp(%s)\n", pVal);
// return true;
//}
//return false;
}
static bool convertStrToNumber(TAOS_SML_KV *pVal, char *str, SSmlLinesInfo* info) {
errno = 0;
uint8_t type = pVal->type;
......@@ -1987,114 +1353,14 @@ bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
return false;
}
static int32_t getTimeStampValue(char *value, uint16_t len,
SMLTimeStampType type, int64_t *ts, SSmlLinesInfo* info) {
//No appendix or no timestamp given (len = 0)
if (len != 0 && type != SML_TIME_STAMP_NOW) {
*ts = (int64_t)strtoll(value, NULL, 10);
} else {
type = SML_TIME_STAMP_NOW;
}
switch (type) {
case SML_TIME_STAMP_NOW: {
*ts = taosGetTimestampNs();
break;
}
case SML_TIME_STAMP_HOURS: {
*ts = (int64_t)(*ts * 3600 * 1e9);
break;
}
case SML_TIME_STAMP_MINUTES: {
*ts = (int64_t)(*ts * 60 * 1e9);
break;
}
case SML_TIME_STAMP_SECONDS: {
*ts = (int64_t)(*ts * 1e9);
break;
}
case SML_TIME_STAMP_MILLI_SECONDS: {
*ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
break;
}
case SML_TIME_STAMP_MICRO_SECONDS: {
*ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
break;
}
case SML_TIME_STAMP_NANO_SECONDS: {
*ts = *ts * 1;
break;
}
default: {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
uint16_t len, SSmlLinesInfo* info) {
int32_t ret;
SMLTimeStampType type = SML_TIME_STAMP_NOW;
int64_t tsVal;
ret = isTimeStamp(value, len, &type, info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
ret = getTimeStampValue(value, len, type, &tsVal, info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
uDebug("SML:0x%"PRIx64"Timestamp after conversion:%"PRId64, info->id, tsVal);
pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = calloc(pVal->length, 1);
memcpy(pVal->value, &tsVal, pVal->length);
return TSDB_CODE_SUCCESS;
}
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index, SSmlLinesInfo* info) {
const char *start, *cur;
int32_t ret = TSDB_CODE_SUCCESS;
int len = 0;
char key[] = "ts";
char *value = NULL;
start = cur = *index;
*pTS = calloc(1, sizeof(TAOS_SML_KV));
while(*cur != '\0') {
cur++;
len++;
}
if (len > 0) {
value = calloc(len + 1, 1);
memcpy(value, start, len);
}
ret = convertSmlTimeStamp(*pTS, value, len, info);
if (ret) {
taosMemoryFree(value);
taosMemoryFree(*pTS);
return ret;
}
taosMemoryFree(value);
(*pTS)->key = calloc(sizeof(key), 1);
memcpy((*pTS)->key, key, sizeof(key));
return ret;
}
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
char *val = NULL;
val = taosHashGet(pHash, key, strlen(key));
if (val) {
uError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
return true;
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
char *val = NULL;
val = taosHashGet(pHash, key, strlen(key));
if (val) {
uError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
return true;
}
uint8_t dummy_val = 0;
......@@ -2103,458 +1369,6 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
return false;
}
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
const char *cur = *index;
char key[TSDB_COL_NAME_LEN + 1]; // +1 to avoid key[len] over write
int16_t len = 0;
while (*cur != '\0') {
if (len > TSDB_COL_NAME_LEN - 1) {
uError("SML:0x%"PRIx64" Key field cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
//unescaped '=' identifies a tag key
if (*cur == '=' && *(cur - 1) != '\\') {
break;
}
//Escape special character
if (*cur == '\\') {
escapeSpecialCharacter(2, &cur);
}
key[len] = *cur;
cur++;
len++;
}
if (len == 0) {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
key[len] = '\0';
if (checkDuplicateKey(key, pHash, info)) {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
pKV->key = calloc(len + TS_BACKQUOTE_CHAR_SIZE + 1, 1);
memcpy(pKV->key, key, len + 1);
addEscapeCharToString(pKV->key, len);
uDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
*index = cur + 1;
return TSDB_CODE_SUCCESS;
}
static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
bool *is_last_kv, SSmlLinesInfo* info, bool isTag) {
const char *start, *cur;
int32_t ret = TSDB_CODE_SUCCESS;
char *value = NULL;
int16_t len = 0;
bool kv_done = false;
bool back_slash = false;
bool double_quote = false;
size_t line_len = 0;
enum {
tag_common,
tag_lqoute,
tag_rqoute
} tag_state;
enum {
val_common,
val_lqoute,
val_rqoute
} val_state;
start = cur = *index;
tag_state = tag_common;
val_state = val_common;
while (1) {
if (isTag) {
/* ',', '=' and spaces MUST be escaped */
switch (tag_state) {
case tag_common:
if (back_slash == true) {
if (*cur != ',' && *cur != '=' && *cur != ' ') {
uError("SML:0x%"PRIx64" tag value: state(%d), incorrect character(%c) escaped", info->id, tag_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
back_slash = false;
cur++;
len++;
break;
}
if (*cur == '"') {
if (cur == *index) {
tag_state = tag_lqoute;
}
cur += 1;
len += 1;
break;
} else if (*cur == 'L') {
line_len = strlen(*index);
/* common character at the end */
if (cur + 1 >= *index + line_len) {
*is_last_kv = true;
kv_done = true;
break;
}
if (*(cur + 1) == '"') {
/* string starts here */
if (cur + 1 == *index + 1) {
tag_state = tag_lqoute;
}
cur += 2;
len += 2;
break;
}
}
switch (*cur) {
case '\\':
back_slash = true;
cur++;
len++;
break;
case ',':
kv_done = true;
break;
case ' ':
/* fall through */
case '\0':
*is_last_kv = true;
kv_done = true;
break;
default:
cur++;
len++;
}
break;
case tag_lqoute:
if (back_slash == true) {
if (*cur != ',' && *cur != '=' && *cur != ' ') {
uError("SML:0x%"PRIx64" tag value: state(%d), incorrect character(%c) escaped", info->id, tag_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
back_slash = false;
cur++;
len++;
break;
} else if (double_quote == true) {
if (*cur != ' ' && *cur != ',' && *cur != '\0') {
uError("SML:0x%"PRIx64" tag value: state(%d), incorrect character(%c) behind closing \"", info->id, tag_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
if (*cur == ' ' || *cur == '\0') {
*is_last_kv = true;
}
double_quote = false;
tag_state = tag_rqoute;
break;
}
switch (*cur) {
case '\\':
back_slash = true;
cur++;
len++;
break;
case '"':
double_quote = true;
cur++;
len++;
break;
case ',':
/* fall through */
case '=':
/* fall through */
case ' ':
if (*(cur - 1) != '\\') {
uError("SML:0x%"PRIx64" tag value: state(%d), character(%c) not escaped", info->id, tag_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
kv_done = true;
}
break;
case '\0':
uError("SML:0x%"PRIx64" tag value: state(%d), closing \" not found", info->id, tag_state);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
kv_done = true;
break;
default:
cur++;
len++;
}
break;
default:
kv_done = true;
}
} else {
switch (val_state) {
case val_common:
if (back_slash == true) {
if (*cur != '\\' && *cur != '"') {
uError("SML:0x%"PRIx64" field value: state(%d), incorrect character(%c) escaped", info->id, val_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
back_slash = false;
cur++;
len++;
break;
}
if (*cur == '"') {
if (cur == *index) {
val_state = val_lqoute;
} else {
if (*(cur - 1) != '\\') {
uError("SML:0x%"PRIx64" field value: state(%d), \" not escaped", info->id, val_state);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
}
cur += 1;
len += 1;
break;
} else if (*cur == 'L') {
line_len = strlen(*index);
/* common character at the end */
if (cur + 1 >= *index + line_len) {
*is_last_kv = true;
kv_done = true;
break;
}
if (*(cur + 1) == '"') {
/* string starts here */
if (cur + 1 == *index + 1) {
val_state = val_lqoute;
cur += 2;
len += 2;
} else {
/* MUST at the end of string */
if (cur + 2 >= *index + line_len) {
cur += 2;
len += 2;
*is_last_kv = true;
kv_done = true;
} else {
if (*(cur + 2) != ' ' && *(cur + 2) != ',') {
uError("SML:0x%"PRIx64" field value: state(%d), not closing character(L\")", info->id, val_state);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
} else {
if (*(cur + 2) == ' ') {
*is_last_kv = true;
}
cur += 2;
len += 2;
kv_done = true;
}
}
}
break;
}
}
switch (*cur) {
case '\\':
back_slash = true;
cur++;
len++;
break;
case ',':
kv_done = true;
break;
case ' ':
/* fall through */
case '\0':
*is_last_kv = true;
kv_done = true;
break;
default:
cur++;
len++;
}
break;
case val_lqoute:
if (back_slash == true) {
if (*cur != '\\' && *cur != '"') {
uError("SML:0x%"PRIx64" field value: state(%d), incorrect character(%c) escaped", info->id, val_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
back_slash = false;
cur++;
len++;
break;
} else if (double_quote == true) {
if (*cur != ' ' && *cur != ',' && *cur != '\0') {
uError("SML:0x%"PRIx64" field value: state(%d), incorrect character(%c) behind closing \"", info->id, val_state, *cur);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
goto error;
}
if (*cur == ' ' || *cur == '\0') {
*is_last_kv = true;
}
double_quote = false;
val_state = val_rqoute;
break;
}
switch (*cur) {
case '\\':
back_slash = true;
cur++;
len++;
break;
case '"':
double_quote = true;
cur++;
len++;
break;
case '\0':
uError("SML:0x%"PRIx64" field value: state(%d), closing \" not found", info->id, val_state);
ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
kv_done = true;
break;
default:
cur++;
len++;
}
break;
default:
kv_done = true;
}
}
if (kv_done == true) {
break;
}
}
if (len == 0 || ret != TSDB_CODE_SUCCESS) {
taosMemoryFree(pKV->key);
pKV->key = NULL;
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
value = calloc(len + 1, 1);
memcpy(value, start, len);
value[len] = '\0';
if (!convertSmlValueType(pKV, value, len, info, isTag)) {
uError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
info->id, value);
taosMemoryFree(value);
ret = TSDB_CODE_TSC_INVALID_VALUE;
goto error;
}
taosMemoryFree(value);
*index = (*cur == '\0') ? cur : cur + 1;
return ret;
error:
//taosMemoryFree previous alocated key field
taosMemoryFree(pKV->key);
pKV->key = NULL;
return ret;
}
/* Field Escape charaters
1: measurement Comma,Space
2: tag_key, tag_value, field_key Comma,Equal Sign,Space
3: field_value Double quote,Backslash
*/
static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
uint8_t *has_tags, SSmlLinesInfo* info) {
const char *cur = *index;
int16_t len = 0;
pSml->stableName = calloc(TSDB_TABLE_NAME_LEN + TS_BACKQUOTE_CHAR_SIZE, 1);
if (pSml->stableName == NULL){
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
while (*cur != '\0') {
if (len > TSDB_TABLE_NAME_LEN - 1) {
uError("SML:0x%"PRIx64" Measurement field cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
taosMemoryFree(pSml->stableName);
pSml->stableName = NULL;
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
//first unescaped comma or space identifies measurement
//if space detected first, meaning no tag in the input
if (*cur == ',' && *(cur - 1) != '\\') {
*has_tags = 1;
break;
}
if (*cur == ' ' && *(cur - 1) != '\\') {
if (*(cur + 1) != ' ') {
break;
}
else {
cur++;
continue;
}
}
//Comma, Space, Backslash needs to be escaped if any
if (*cur == '\\') {
escapeSpecialCharacter(1, &cur);
}
pSml->stableName[len] = *cur;
cur++;
len++;
}
if (len == 0) {
taosMemoryFree(pSml->stableName);
pSml->stableName = NULL;
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
addEscapeCharToString(pSml->stableName, len);
*index = cur + 1;
uDebug("SML:0x%"PRIx64" Stable name in measurement:%s|len:%d", info->id, pSml->stableName, len);
return TSDB_CODE_SUCCESS;
}
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info) {
if (len > TSDB_TABLE_NAME_LEN - 1) {
......@@ -2570,147 +1384,6 @@ int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* i
return TSDB_CODE_SUCCESS;
}
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
const char **index, bool isField,
TAOS_SML_DATA_POINT* smlData, SHashObj *pHash,
SSmlLinesInfo* info) {
const char *cur = *index;
int32_t ret = TSDB_CODE_SUCCESS;
TAOS_SML_KV *pkv;
bool is_last_kv = false;
int32_t capacity = 0;
if (isField) {
capacity = 64;
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
// leave space for timestamp;
pkv = *pKVs;
pkv++;
} else {
capacity = 8;
*pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
pkv = *pKVs;
}
size_t childTableNameLen = strlen(tsSmlChildTableName);
char childTableName[TSDB_TABLE_NAME_LEN + TS_BACKQUOTE_CHAR_SIZE] = {0};
if (childTableNameLen != 0) {
memcpy(childTableName, tsSmlChildTableName, childTableNameLen);
addEscapeCharToString(childTableName, (int32_t)(childTableNameLen));
}
while (*cur != '\0') {
ret = parseSmlKey(pkv, &cur, pHash, info);
if (ret) {
uError("SML:0x%"PRIx64" Unable to parse key", info->id);
goto error;
}
ret = parseSmlValue(pkv, &cur, &is_last_kv, info, !isField);
if (ret) {
uError("SML:0x%"PRIx64" Unable to parse value", info->id);
goto error;
}
if (!isField && childTableNameLen != 0 && strcasecmp(pkv->key, childTableName) == 0) {
smlData->childTableName = taosMemoryMalloc(pkv->length + TS_BACKQUOTE_CHAR_SIZE + 1);
memcpy(smlData->childTableName, pkv->value, pkv->length);
addEscapeCharToString(smlData->childTableName, (int32_t)pkv->length);
taosMemoryFree(pkv->key);
taosMemoryFree(pkv->value);
} else {
*num_kvs += 1;
}
if (is_last_kv) {
goto done;
}
//reallocate addtional memory for more kvs
TAOS_SML_KV *more_kvs = NULL;
if (isField) {
if ((*num_kvs + 2) > capacity) {
capacity *= 3; capacity /= 2;
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
} else {
more_kvs = *pKVs;
}
} else {
if ((*num_kvs + 1) > capacity) {
capacity *= 3; capacity /= 2;
more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
} else {
more_kvs = *pKVs;
}
}
if (!more_kvs) {
goto error;
}
*pKVs = more_kvs;
//move pKV points to next TAOS_SML_KV block
if (isField) {
pkv = *pKVs + *num_kvs + 1;
} else {
pkv = *pKVs + *num_kvs;
}
}
goto done;
error:
return ret;
done:
*index = cur;
return ret;
}
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
TAOS_SML_KV* tsField = (*smlData)->fields;
tsField->length = ts->length;
tsField->type = ts->type;
tsField->value = taosMemoryMalloc(ts->length);
tsField->key = taosMemoryMalloc(strlen(ts->key) + 1);
memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
memcpy(tsField->value, ts->value, ts->length);
(*smlData)->fieldNum = (*smlData)->fieldNum + 1;
taosMemoryFree(ts->key);
taosMemoryFree(ts->value);
taosMemoryFree(ts);
}
/* Field Escape charaters
1: measurement Comma,Space
2: tag_key, tag_value, field_key Comma,Equal Sign,Space
3: field_value Double quote,Backslash
*/
//void findSpace(const char** sql, const char **tags, int32_t *tagLen){
// const char *cur = *sql;
// *tagLen = 0;
// *tags = NULL;
// if(!cur) return;
// while (*cur != '\0') { // jump the space at the begining
// if(*cur != SPACE) {
// *tags = cur;
// break;
// }
// cur++;
// }
//
// while (*cur != '\0') { // find the first space
// if (*cur == SPACE && *(cur - 1) != SLASH) {
// *tagLen = cur - *tags;
// break;
// }
//
// cur++;
// }
// *sql = cur;
// return;
//}
int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
if(!sql) return TSDB_CODE_SML_INVALID_DATA;
while (*sql != '\0') { // jump the space at the begining
......@@ -2757,14 +1430,13 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
while (*sql != '\0') {
if(*sql == SPACE && *(sql - 1) != SLASH) {
elements->colsLen = sql - elements->cols;
break;
}
sql++;
}
if(elements->colsLen == 0) return TSDB_CODE_SML_INVALID_DATA;
elements->colsLen = sql - elements->cols;
// parse ts
// parse ts,ts can be empty
while (*sql != '\0') {
if(*sql != SPACE) {
elements->timestamp = sql;
......@@ -2772,12 +1444,11 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
}
sql++;
}
if(!elements->timestamp) return TSDB_CODE_SML_INVALID_DATA;
return TSDB_CODE_SUCCESS;
}
int32_t parseSmlKV(const char* data, int32_t len, SArray *tags){
int32_t parseSmlKV(const char* data, int32_t len, SArray *cols, bool isTag){
for(int i = 0; i < len; i++){
const char *key = data + i;
int32_t keyLen = 0;
......@@ -2805,24 +1476,112 @@ int32_t parseSmlKV(const char* data, int32_t len, SArray *tags){
if(valueLen == 0){
return TSDB_CODE_SML_INVALID_DATA;
}
TAOS_SML_KV *kv = taosMemoryCalloc(sizeof(TAOS_SML_KV), 1);
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = key;
kv->keyLen = keyLen;
kv->value = value;
kv->valueLen = valueLen;
if(isTag){
kv->type = TSDB_DATA_TYPE_NCHAR;
if(tags) taosArrayPush(tags, &kv);
}
if(cols) taosArrayPush(cols, &kv);
}
return TSDB_CODE_SUCCESS;
}
static int64_t getTimeStampValue(const char *value, int32_t type) {
double ts = (double)strtoll(value, NULL, 10);
switch (type) {
case TSDB_TIME_PRECISION_HOURS:
ts *= (3600 * 1e9);
case TSDB_TIME_PRECISION_MINUTES:
ts *= (60 * 1e9);
case TSDB_TIME_PRECISION_SECONDS:
ts *= (1e9);
case TSDB_TIME_PRECISION_MICRO:
ts *= (1e6);
case TSDB_TIME_PRECISION_MILLI:
ts *= (1e3);
default:
break;
}
if(ts > (double)INT64_MAX || ts < 0){
return -1;
}else{
return (int64_t)ts;
}
}
static int64_t getTimeStampNow(int32_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_HOURS:
return taosGetTimestampMs()/1000/3600;
case TSDB_TIME_PRECISION_MINUTES:
return taosGetTimestampMs()/1000/60;
case TSDB_TIME_PRECISION_SECONDS:
return taosGetTimestampMs()/1000;
default:
return taosGetTimestamp(precision);
}
}
static int32_t isValidateTimeStamp(const char *pVal, int32_t len) {
for (int i = 0; i < len; ++i) {
if (!isdigit(pVal[i])) {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t parseSmlTS(const char* data, int32_t len, SArray *tags){
TAOS_SML_KV *kv = taosMemoryCalloc(sizeof(TAOS_SML_KV), 1);
kv->value = data;
kv->valueLen = len;
static int32_t getTsType(int32_t len) {
if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
return TSDB_TIME_PRECISION_SECONDS;
} else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
return TSDB_TIME_PRECISION_MILLI_DIGITS;
} else {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
static int32_t parseSmlTS(const char* data, SArray *tags, int8_t tsType, SMLProtocolType protocolType){
int64_t *ts = taosMemoryCalloc(1, sizeof(int64_t));
if(data == NULL){
if(protocolType == TSDB_SML_LINE_PROTOCOL){
*ts = getTimeStampNow(tsType);
}else{
goto cleanup;
}
}else{
int32_t len = strlen(data);
int ret = isValidateTimeStamp(data, len);
if(!ret){
goto cleanup;
}
if(protocolType != TSDB_SML_LINE_PROTOCOL){
tsType = getTsType(len);
if (tsType == TSDB_CODE_TSC_INVALID_TIME_STAMP) {
goto cleanup;
}
}
*ts = getTimeStampValue(data, tsType);
if(*ts == -1){
goto cleanup;
}
}
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->value = (const char*)ts;
kv->valueLen = sizeof(int64_t);
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
if(tags) taosArrayPush(tags, &kv);
return TSDB_CODE_SUCCESS;
cleanup:
taosMemoryFree(ts);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
//int32_t parseSmlCols(const char* data, SArray *cols){
......@@ -2871,8 +1630,8 @@ int32_t parseSmlTS(const char* data, int32_t len, SArray *tags){
void updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
TAOS_SML_KV *kv = taosArrayGetP(tags, i);
TAOS_SML_KV **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
SSmlKv *kv = taosArrayGetP(tags, i);
SSmlKv **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
if(value){
if(kv->type != (*value)->type){
// todo
......@@ -2886,8 +1645,8 @@ void updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
if(cols){
for (int i = 1; i < taosArrayGetSize(cols); ++i) { //jump timestamp
TAOS_SML_KV *kv = taosArrayGetP(cols, i);
TAOS_SML_KV **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
SSmlKv *kv = taosArrayGetP(cols, i);
SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(value){
if(kv->type != (*value)->type){
// todo
......@@ -2902,20 +1661,20 @@ void updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
void insertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
TAOS_SML_KV *kv = taosArrayGetP(tags, i);
SSmlKv *kv = taosArrayGetP(tags, i);
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
}
if(cols){
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
TAOS_SML_KV *kv = taosArrayGetP(cols, i);
SSmlKv *kv = taosArrayGetP(cols, i);
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
}
}
int32_t tscParseLine(const char* sql, SSmlLinesInfo* info) {
static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) {
TAOS_PARSE_ELEMENTS elements = {0};
int ret = parseSml(sql, &elements);
if(ret != TSDB_CODE_SUCCESS){
......@@ -2927,8 +1686,9 @@ int32_t tscParseLine(const char* sql, SSmlLinesInfo* info) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
parseSmlTS(elements.timestamp, strlen(elements.timestamp), cols);
ret = parseSmlKV(elements.cols, elements.colsLen, cols);
parseSmlTS(elements.timestamp, cols, info->tsType);
ret = parseSmlCols(elements.cols, elements.colsLen, cols, false);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
......@@ -2954,7 +1714,7 @@ int32_t tscParseLine(const char* sql, SSmlLinesInfo* info) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ret = parseSmlKV(elements.tags, elements.tagsLen, tag->tags);
ret = parseSmlTags(elements.tags, elements.tagsLen, tag->tags);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
......@@ -2973,112 +1733,115 @@ int32_t tscParseLine(const char* sql, SSmlLinesInfo* info) {
return TSDB_CODE_SUCCESS;
}
int32_t tscParseLines(char* lines[], int numLines, SSmlLinesInfo* info) {
for (int32_t i = 0; i < numLines; ++i) {
int32_t code = tscParseLine(lines[i], info);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
return code;
}
}
uDebug("SML:0x%"PRIx64" data point line parse success. tables %d", info->id, taosHashGetSize(info->childTables));
return TSDB_CODE_SUCCESS;
static void smlDestroyInfo(SSmlLinesInfo* info){
if(!info) return;
qDestroyQuery(info->pQuery);
tscSmlDestroyHandle(info->exec);
taosHashCleanup(info->childTables);
taosHashCleanup(info->superTables);
taosHashCleanup(info->metaHashObj);
taosHashCleanup(info->pVgHash);
taosMemoryFree(info);
}
int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) {
int32_t code = 0;
static SSmlLinesInfo* smlBuildInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int32_t tsType){
SSmlLinesInfo* info = taosMemoryMalloc(sizeof(SSmlLinesInfo));
if (NULL == info) {
return NULL;
}
info->id = genLinesSmlId();
info->tsType = tsType;
info->taos = (STscObj*)taos;
info->taos = taos;
info->protocol = protocol;
if (numLines <= 0 || numLines > 65536) {
uError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
code = TSDB_CODE_TSC_APP_ERROR;
info->pQuery = taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == info->pQuery) {
goto cleanup;
}
info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
info->pQuery->haveResultSet = false;
info->pQuery->msgType = TDMT_VND_SUBMIT;
info->pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->exec = tscSmlInitHandle(info->pQuery);
int32_t code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
goto cleanup;
}
info->pRequest = request;
info->msgBuf = info->pRequest->msgBuf;
info->msgLen = ERROR_MSG_BUF_DEFAULT_SIZE;
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
info->metaHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
uDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
code = tscParseLines(lines, numLines, info);
return info;
if (code != 0) {
cleanup:
smlDestroyInfo(info);
return NULL;
}
int sml_insert_lines(TAOS* taos, SRequestObj* request, char* lines[], int numLines, SMLProtocolType protocol, int32_t tsType) {
int32_t code = TSDB_CODE_SUCCESS;
SSmlLinesInfo* info = smlBuildInfo(taos, request, protocol, tsType);
if(!info){
code = TSDB_CODE_OUT_OF_MEMORY;
goto cleanup;
}
if (numLines <= 0 || numLines > 65536) {
uError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
code = TSDB_CODE_TSC_APP_ERROR;
goto cleanup;
}
for (int32_t i = 0; i < numLines; ++i) {
code = smlParseLine(lines[i], info);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
goto cleanup;
}
}
uDebug("SML:0x%"PRIx64" data point line parse success. tables %d", info->id, taosHashGetSize(info->childTables));
code = tscSmlInsert(taos, info);
if (code != 0) {
code = smlInsert(taos, info);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
goto cleanup;
}
if (affectedRows != NULL) {
*affectedRows = info->affectedRows;
}
uDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
cleanup:
taosMemoryFree(info);
smlDestroyInfo(info);
return code;
}
static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
static int32_t convertPrecisionType(int precision) {
switch (precision) {
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
*tsType = SML_TIME_STAMP_NOT_CONFIGURED;
break;
case TSDB_SML_TIMESTAMP_HOURS:
*tsType = SML_TIME_STAMP_HOURS;
break;
return TSDB_TIME_PRECISION_HOURS;
case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
*tsType = SML_TIME_STAMP_MILLI_SECONDS;
break;
return TSDB_TIME_PRECISION_MILLI;
case TSDB_SML_TIMESTAMP_NANO_SECONDS:
*tsType = SML_TIME_STAMP_NANO_SECONDS;
break;
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
return TSDB_TIME_PRECISION_NANO;
case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
*tsType = SML_TIME_STAMP_MICRO_SECONDS;
break;
return TSDB_TIME_PRECISION_MICRO;
case TSDB_SML_TIMESTAMP_SECONDS:
*tsType = SML_TIME_STAMP_SECONDS;
break;
return TSDB_TIME_PRECISION_SECONDS;
case TSDB_SML_TIMESTAMP_MINUTES:
*tsType = SML_TIME_STAMP_MINUTES;
break;
return TSDB_TIME_PRECISION_MINUTES;
default:
return TSDB_CODE_SML_INVALID_PRECISION_TYPE;
return -1;
}
return TSDB_CODE_SUCCESS;
}
//make a dummy SSqlObj
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
return NULL;
}
pNew->signature = pNew;
pNew->pTscObj = taos;
pNew->fp = NULL;
tsem_init(&pNew->rspSem, 0, 0);
registerSqlObj(pNew);
pNew->res.numOfRows = affected_rows;
pNew->res.code = code;
return pNew;
}
/**
* taos_schemaless_insert() parse and insert data points into database according to
* different protocol.
......@@ -3102,19 +1865,17 @@ static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t cod
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
int code = TSDB_CODE_SUCCESS;
int affected_rows = 0;
SMLTimeStampType tsType = SML_TIME_STAMP_NOW;
if (protocol == TSDB_SML_LINE_PROTOCOL) {
code = convertPrecisionType(precision, &tsType);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
}
SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
switch (protocol) {
case TSDB_SML_LINE_PROTOCOL:
code = taos_insert_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
int32_t tsType = convertPrecisionType(precision);
if(tsType == -1){
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
goto end;
}
code = sml_insert_lines(taos, request, lines, numLines, protocol, tsType);
break;
case TSDB_SML_TELNET_PROTOCOL:
//code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
......@@ -3127,8 +1888,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
break;
}
SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code);
return (TAOS_RES*)pSql;
end:
return (TAOS_RES*)request;
}
......@@ -364,7 +364,7 @@ int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags) {
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, &pStmt->bInfo.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
return TSDB_CODE_SUCCESS;
}
......
......@@ -136,7 +136,7 @@ void destroyBlockHashmap(SHashObj* pDataBlockHash);
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks);
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq);
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
......
......@@ -41,7 +41,6 @@ int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg);
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
SSchema *getTableColumnSchema(const STableMeta *pTableMeta);
SSchema *getTableTagSchema(const STableMeta* pTableMeta);
int32_t getNumOfColumns(const STableMeta* pTableMeta);
......
......@@ -763,11 +763,9 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, voi
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const SName* pName, SKVRow row, int64_t suid) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName);
static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const char* tname, SKVRow row, int64_t suid) {
pTbReq->type = TD_CHILD_TABLE;
pTbReq->name = strdup(pName->tname);
pTbReq->name = strdup(tname);
pTbReq->ctbCfg.suid = suid;
pTbReq->ctbCfg.pTag = row;
......@@ -775,7 +773,7 @@ static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const SName* pName, SKVRo
}
// pSql -> tag1_value, ...)
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const SName* pName) {
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -816,7 +814,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
}
tdSortKVRowByColIdx(row);
return buildCreateTbReq(&pCxt->createTblReq, pName, row, pCxt->pTableMeta->suid);
return buildCreateTbReq(&pCxt->createTblReq, tName, row, pCxt->pTableMeta->suid);
}
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
......@@ -876,7 +874,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
if (TK_NK_LP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, &name));
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name.tname));
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
......@@ -1024,11 +1022,6 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) {
taosMemoryFreeClear(pDataBlock->pData);
if (!pDataBlock->cloned) {
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
taosMemoryFreeClear(pDataBlock->pTableMeta);
}
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
taosMemoryFreeClear(pDataBlock);
......@@ -1256,7 +1249,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen){
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen){
STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
......@@ -1297,7 +1290,7 @@ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *p
tdSortKVRowByColIdx(row);
SVCreateTbReq tbReq = {0};
CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid));
CHECK_CODE(buildCreateTbReq(&tbReq, tName, row, suid));
CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));
destroyCreateSubTbReq(&tbReq);
......@@ -1522,4 +1515,211 @@ int32_t qBuildStmtColFields(void *pBlock, int32_t *fieldNum, TAOS_FIELD** fields
}
typedef struct SmlExecHandle {
SHashObj* pBlockHash;
SParsedDataColInfo tags; // each table
SKVRowBuilder tagsBuilder; // each table
SVCreateTbReq createTblReq; // each table
SQuery* pQuery;
} SmlExecHandle;
static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
col_id_t nCols = pColList->numOfCols;
pColList->numOfBound = 0;
pColList->boundNullLen = 0;
memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
for (col_id_t i = 0; i < nCols; ++i) {
pColList->cols[i].valStat = VAL_STAT_NONE;
}
bool isOrdered = true;
col_id_t lastColIdx = -1; // last column found
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = taosArrayGetP(cols, i);
SToken sToken = {.n=kv->keyLen, .z=kv->key};
col_id_t t = lastColIdx + 1;
col_id_t index = findCol(&sToken, t, nCols, pSchema);
if (index < 0 && t > 0) {
index = findCol(&sToken, 0, t, pSchema);
isOrdered = false;
}
if (index < 0) {
return TSDB_CODE_SML_INVALID_DATA;
}
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
return TSDB_CODE_SML_INVALID_DATA;
}
lastColIdx = index;
pColList->cols[index].valStat = VAL_STAT_HAS;
pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
++pColList->numOfBound;
switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
break;
}
}
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
if (!isOrdered) {
pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
if (NULL == pColList->colIdxInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].schemaColIdx = pColList->boundColumns[i];
pColIdx[i].boundIdx = i;
}
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].finalIdx = i;
}
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
}
if(pColList->numOfCols > pColList->numOfBound){
memset(&pColList->boundColumns[pColList->numOfBound], 0,
sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SVCreateTbReq *createTblReq) {
if (tdInitKVRowBuilder(tagsBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SKvParam param = {.builder = tagsBuilder};
for (int i = 0; i < tags->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1
param.schema = pTagSchema;
SSmlKv *kv = taosArrayGetP(cols, i);
KvRowAppend(NULL, kv->value, kv->valueLen, &param) ;
}
SKVRow row = tdGetKVRowFromBuilder(tagsBuilder);
if(row == NULL){
return TSDB_CODE_SML_INVALID_DATA;
}
tdSortKVRowByColIdx(row);
createTblReq->type = TD_CHILD_TABLE;
createTblReq->ctbCfg.pTag = row;
return TSDB_CODE_SUCCESS;
}
int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = taosArrayGetSize(cols);
if(rowNum <= 0) {
return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
}
SmlExecHandle *smlHandle = (SmlExecHandle *)handle;
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
smlBoundColumns(tags, &smlHandle->tags, pTagsSchema);
smlParseTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &smlHandle->createTblReq);
STableDataBlocks* pDataBlock = NULL;
getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta,
&pDataBlock, NULL, &smlHandle->createTblReq);
SSchema* pSchema = getTableColumnSchema(pTableMeta);
smlBoundColumns(taosArrayGetP(cols, 0), &pDataBlock->boundColumnInfo, pSchema);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder};
initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
SArray *rowData = taosArrayGetP(cols, r);
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
param.schema = pColSchema;
getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);
SSmlKv *kv = taosArrayGetP(rowData, c);
if (kv->valueLen == 0) {
MemRowAppend(&pBuf, NULL, 0, &param);
} else {
int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = kv->valueLen;
}
MemRowAppend(&pBuf, kv->value, colLen, &param);
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
checkTimestamp(pDataBlock, (const char *)&tsKey);
}
}
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
for (int32_t i = 0; i < spd->numOfCols; ++i) {
if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE
tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i,
spd->cols[i].toffset);
}
}
}
pDataBlock->size += extendedRowSize;
}
SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
}
return TSDB_CODE_SUCCESS;
}
void* tscSmlInitHandle(SQuery *pQuery){
SmlExecHandle *handle = taosMemoryCalloc(sizeof(SmlExecHandle));
handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
handle->pQuery = pQuery;
return handle;
}
void tscSmlDestroyHandle(void *pHandle){
if(!pHandle) return;
SmlExecHandle *handle = (SmlExecHandle *)pHandle;
taosHashCleanup(handle->pBlockHash);
taosMemoryFree(handle);
}
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
SmlExecHandle *smlHandle = (SmlExecHandle *)handle;
return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
......@@ -116,7 +116,7 @@ void destroyBoundColumnInfo(void* pBoundInfo) {
}
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -137,8 +137,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
}
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tableMetaDup(pTableMeta);
dataBuf->pTableMeta = pTableMeta;
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta);
......@@ -176,8 +175,7 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
return TSDB_CODE_SUCCESS;
}
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) {
......@@ -227,9 +225,9 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) {
taosMemoryFreeClear(pDataBlock->pData);
if (!pDataBlock->cloned) {
// free the refcount for metermeta
if (pDataBlock->pTableMeta != NULL) {
taosMemoryFreeClear(pDataBlock->pTableMeta);
}
// if (pDataBlock->pTableMeta != NULL) {
// taosMemoryFreeClear(pDataBlock->pTableMeta);
// }
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
}
......
......@@ -166,26 +166,6 @@ int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char*
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
static uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
int32_t totalCols = 0;
if (pTableMeta->tableInfo.numOfColumns >= 0) {
totalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
}
return sizeof(STableMeta) + totalCols * sizeof(SSchema);
}
STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
size_t size = getTableMetaSize(pTableMeta);
STableMeta* p = taosMemoryMalloc(size);
memcpy(p, pTableMeta, size);
return p;
}
SSchema *getTableColumnSchema(const STableMeta *pTableMeta) {
assert(pTableMeta != NULL);
return (SSchema*) pTableMeta->schema;
......
......@@ -51,8 +51,8 @@ void getPrevCharSize(const char *str, int pos, int *size, int *width) {
if (str[pos] > 0 || countPrefixOnes((unsigned char)str[pos]) > 1) break;
}
int rc = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
assert(rc == *size);
taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
// assert(rc == *size); // it will be core, if str is encode by utf8 and taos charset is gbk
*width = taosWcharWidth(wc);
}
......
......@@ -224,63 +224,27 @@ int32_t shellRunCommand(TAOS *con, char *command) {
}
}
bool esc = false;
char quote = 0, *cmd = command, *p = command;
char quote = 0, *cmd = command;
for (char c = *command++; c != 0; c = *command++) {
if (esc) {
switch (c) {
case 'n':
c = '\n';
break;
case 'r':
c = '\r';
break;
case 't':
c = '\t';
break;
case 'G':
*p++ = '\\';
break;
case '\'':
case '"':
if (quote) {
*p++ = '\\';
}
break;
}
*p++ = c;
esc = false;
if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) {
command ++;
continue;
}
if (c == '\\') {
if (quote != 0 && (*command == '_' || *command == '\\')) {
// DO nothing
} else {
esc = true;
continue;
}
}
if (quote == c) {
quote = 0;
} else if (quote == 0 && (c == '\'' || c == '"')) {
} else if (quote == 0 && (c == '\'' || c == '"' || c == '`')) {
quote = c;
}
*p++ = c;
if (c == ';' && quote == 0) {
c = *p;
*p = 0;
} else if (c == ';' && quote == 0) {
c = *command;
*command = 0;
if (shellRunSingleCommand(con, cmd) < 0) {
return -1;
}
*p = c;
p = cmd;
*command = c;
cmd = command;
}
}
*p = 0;
return shellRunSingleCommand(con, cmd);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册