提交 a4d03b43 编写于 作者: wmmhello's avatar wmmhello

opti: async->sync for schemaless

上级 443aa2ee
...@@ -58,7 +58,6 @@ typedef struct SParseContext { ...@@ -58,7 +58,6 @@ typedef struct SParseContext {
bool isSuperUser; bool isSuperUser;
bool enableSysInfo; bool enableSysInfo;
bool async; bool async;
int8_t schemalessType;
const char* svrVer; const char* svrVer;
bool nodeOffline; bool nodeOffline;
SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableMetaPos; // sql table pos => catalog data pos
......
...@@ -149,7 +149,7 @@ typedef struct STscObj { ...@@ -149,7 +149,7 @@ typedef struct STscObj {
int32_t numOfReqs; // number of sqlObj bound to this connection int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo; SAppInstInfo* pAppInfo;
SHashObj* pRequests; SHashObj* pRequests;
int8_t schemalessType; // todo remove it, this attribute should be move to request void* smlHandle;
} STscObj; } STscObj;
typedef struct SResultColumn { typedef struct SResultColumn {
...@@ -323,6 +323,7 @@ void destroyTscObj(void* pObj); ...@@ -323,6 +323,7 @@ void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid); STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid);
void destroyAppInst(SAppInstInfo* pAppInfo); void destroyAppInst(SAppInstInfo* pAppInfo);
void smlDestroyInfo(void *data);
uint64_t generateRequestId(); uint64_t generateRequestId();
......
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0) #define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
//#define tscPerf(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", DEBUG_INFO, cDebugFlag, __VA_ARGS__); }} while(0) #define tscPerf(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -77,11 +77,11 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -77,11 +77,11 @@ static void deregisterRequest(SRequestObj *pRequest) {
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, exec:%" PRId64 "us", "us, exec:%" PRId64 "us",
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
// pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd); pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
// tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 // tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
...@@ -237,6 +237,7 @@ void destroyTscObj(void *pObj) { ...@@ -237,6 +237,7 @@ void destroyTscObj(void *pObj) {
} }
taosThreadMutexDestroy(&pTscObj->mutex); taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFree(pTscObj); taosMemoryFree(pTscObj);
smlDestroyInfo(pTscObj->smlHandle);
tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
} }
...@@ -266,7 +267,6 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c ...@@ -266,7 +267,6 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
taosThreadMutexInit(&pObj->mutex, NULL); taosThreadMutexInit(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 1;
atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);
......
...@@ -232,7 +232,6 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC ...@@ -232,7 +232,6 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.pTransporter = pTscObj->pAppInfo->pTransporter, .pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb, .pStmtCb = pStmtCb,
.pUser = pTscObj->user, .pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.enableSysInfo = pTscObj->sysInfo, .enableSysInfo = pTscObj->sysInfo,
.svrVer = pTscObj->sVer, .svrVer = pTscObj->sVer,
......
...@@ -862,7 +862,6 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { ...@@ -862,7 +862,6 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
.pTransporter = pTscObj->pAppInfo->pTransporter, .pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL, .pStmtCb = NULL,
.pUser = pTscObj->user, .pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.enableSysInfo = pTscObj->sysInfo, .enableSysInfo = pTscObj->sysInfo,
.async = true, .async = true,
......
...@@ -49,22 +49,23 @@ ...@@ -49,22 +49,23 @@
break; \ break; \
} }
// comma , // comma ,
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH) //#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH) #define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH)
// space // space
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH) //#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH) #define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH)
// equal = // equal =
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH) //#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH) #define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH)
// quote " // quote "
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH) //#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) != SLASH) #define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) != SLASH)
// SLASH // SLASH
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH) //#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
#define IS_SLASH_LETTER(sql) \ #define IS_SLASH_LETTER(sql) \
(IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql)) (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == QUOTE || *(sql) == SLASH)) \
// (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len)) #define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
...@@ -104,6 +105,70 @@ typedef enum { ...@@ -104,6 +105,70 @@ typedef enum {
SCHEMA_ACTION_CHANGE_TAG_SIZE, SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction; } ESchemaAction;
/*********************** list start *********************************/
typedef struct {
const void *key;
int32_t keyLen;
void *value;
bool used;
}Node;
typedef struct NodeList{
Node data;
struct NodeList* next;
}NodeList;
static void* nodeListGet(NodeList* list, const void *key, int32_t len){
NodeList *tmp = list;
while(tmp){
if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
return tmp->data.value;
}
tmp = tmp->next;
}
return NULL;
}
static int nodeListSet(NodeList** list, const void *key, int32_t len, void* value){
NodeList *tmp = *list;
while (tmp){
if(!tmp->data.used) break;
if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
return -1;
}
tmp = tmp->next;
}
if(tmp){
tmp->data.key = key;
tmp->data.keyLen = len;
tmp->data.value = value;
tmp->data.used = true;
}else{
NodeList *newNode = taosMemoryCalloc(1, sizeof(NodeList));
if(newNode == NULL){
return -1;
}
newNode->data.key = key;
newNode->data.keyLen = len;
newNode->data.value = value;
newNode->data.used = true;
newNode->next = *list;
*list = newNode;
}
return 0;
}
static int nodeListSize(NodeList* list){
int cnt = 0;
while(list){
if(list->data.used) cnt++;
else break;
list = list->next;
}
return cnt;
}
/*********************** list end *********************************/
typedef struct { typedef struct {
const char *measure; const char *measure;
const char *tags; const char *tags;
...@@ -164,17 +229,8 @@ typedef struct { ...@@ -164,17 +229,8 @@ typedef struct {
int64_t endTime; int64_t endTime;
} SSmlCostInfo; } SSmlCostInfo;
typedef struct {
SRequestObj *request;
tsem_t sem;
int32_t cnt;
int32_t total;
TdThreadSpinlock lock;
} Params;
typedef struct { typedef struct {
int64_t id; int64_t id;
Params *params;
SMLProtocolType protocol; SMLProtocolType protocol;
int8_t precision; int8_t precision;
...@@ -183,8 +239,8 @@ typedef struct { ...@@ -183,8 +239,8 @@ typedef struct {
bool isRawLine; bool isRawLine;
int32_t ttl; int32_t ttl;
SHashObj *childTables; NodeList *childTables;
SHashObj *superTables; NodeList *superTables;
SHashObj *pVgHash; SHashObj *pVgHash;
STscObj *taos; STscObj *taos;
...@@ -193,16 +249,15 @@ typedef struct { ...@@ -193,16 +249,15 @@ typedef struct {
SQuery *pQuery; SQuery *pQuery;
SSmlCostInfo cost; SSmlCostInfo cost;
int32_t affectedRows; int32_t lineNum;
SSmlMsgBuf msgBuf; SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key
cJSON *root; // for parse json cJSON *root; // for parse json
SArray *lines; // element is SSmlLineInfo SSmlLineInfo *lines; // element is SSmlLineInfo
// //
SHashObj *superTableTagKeyStr; NodeList *superTableTagKeyStr;
SHashObj *superTableColKeyStr; NodeList *superTableColKeyStr;
void *currentLineTagKeys; void *currentLineTagKeys;
void *preLineTagKeys; void *preLineTagKeys;
void *currentLineColKeys; void *currentLineColKeys;
...@@ -214,6 +269,7 @@ typedef struct { ...@@ -214,6 +269,7 @@ typedef struct {
SSmlLineInfo preLine; SSmlLineInfo preLine;
STableMeta *currSTableMeta; STableMeta *currSTableMeta;
STableDataCxt *currTableDataCtx; STableDataCxt *currTableDataCtx;
bool needModifySchema;
} SSmlHandle; } SSmlHandle;
//================================================================================================= //=================================================================================================
...@@ -461,6 +517,9 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, ...@@ -461,6 +517,9 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
} }
static int32_t smlModifyDBSchemas(SSmlHandle *info) { static int32_t smlModifyDBSchemas(SSmlHandle *info) {
if(info->dataFormat && !info->needModifySchema){
return TSDB_CODE_SUCCESS;
}
int32_t code = 0; int32_t code = 0;
SHashObj *hashTmp = NULL; SHashObj *hashTmp = NULL;
STableMeta *pTableMeta = NULL; STableMeta *pTableMeta = NULL;
...@@ -474,13 +533,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { ...@@ -474,13 +533,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
conn.requestObjRefId = info->pRequest->self; conn.requestObjRefId = info->pRequest->self;
conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL); NodeList *tmp = info->superTables;
while (tableMetaSml) { while (tmp) {
SSmlSTableMeta *sTableData = *tableMetaSml; SSmlSTableMeta *sTableData = tmp->data.value;
bool needCheckMeta = false; // for multi thread bool needCheckMeta = false; // for multi thread
size_t superTableLen = 0; size_t superTableLen = (size_t)tmp->data.keyLen;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); const void *superTable = tmp->data.key;
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN); memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
memcpy(pName.tname, superTable, superTableLen); memcpy(pName.tname, superTable, superTableLen);
...@@ -629,7 +688,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { ...@@ -629,7 +688,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
sTableData->tableMeta = pTableMeta; sTableData->tableMeta = pTableMeta;
tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml); tmp = tmp->next;
} }
return 0; return 0;
...@@ -779,7 +838,7 @@ static bool smlIsNchar(const char *pVal, uint16_t len) { ...@@ -779,7 +838,7 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
if (len < 3) { if (len < 3) {
return false; return false;
} }
if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') { if (pVal[1] == '"' && pVal[len - 1] == '"' && (pVal[0] == 'l' || pVal[0] == 'L')) {
return true; return true;
} }
return false; return false;
...@@ -787,6 +846,10 @@ static bool smlIsNchar(const char *pVal, uint16_t len) { ...@@ -787,6 +846,10 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
/******************************* parse basic type function end **********************/ /******************************* parse basic type function end **********************/
/******************************* time function **********************/ /******************************* time function **********************/
static int8_t precisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO,
TSDB_TIME_PRECISION_NANO};
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) { static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
char *endPtr = NULL; char *endPtr = NULL;
int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10); int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
...@@ -837,32 +900,12 @@ static int8_t smlGetTsTypeByLen(int32_t len) { ...@@ -837,32 +900,12 @@ static int8_t smlGetTsTypeByLen(int32_t len) {
} }
} }
static int8_t smlGetTsTypeByPrecision(int8_t precision) {
switch (precision) {
case TSDB_SML_TIMESTAMP_HOURS:
return TSDB_TIME_PRECISION_HOURS;
case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
return TSDB_TIME_PRECISION_MILLI;
case TSDB_SML_TIMESTAMP_NANO_SECONDS:
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
return TSDB_TIME_PRECISION_NANO;
case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
return TSDB_TIME_PRECISION_MICRO;
case TSDB_SML_TIMESTAMP_SECONDS:
return TSDB_TIME_PRECISION_SECONDS;
case TSDB_SML_TIMESTAMP_MINUTES:
return TSDB_TIME_PRECISION_MINUTES;
default:
return -1;
}
}
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) { static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
if (len == 0 || (len == 1 && data[0] == '0')) { if (len == 0 || (len == 1 && data[0] == '0')) {
return taosGetTimestampNs(); return taosGetTimestampNs();
} }
int8_t tsType = smlGetTsTypeByPrecision(info->precision); int8_t tsType = precisionConvert[info->precision];
if (tsType == -1) { if (tsType == -1) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
return -1; return -1;
...@@ -1063,21 +1106,20 @@ static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){ ...@@ -1063,21 +1106,20 @@ static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
} }
} }
bool smlFormatJudge(SHashObj* superTableKeyStr, void* preLineKeys, void* currentLineKeys, bool smlFormatJudge(NodeList **superTableKeyStr, void* preLineKeys, void* currentLineKeys,
SSmlLineInfo *currElements, SSmlLineInfo *preElements, int32_t len){ SSmlLineInfo *currElements, bool isSameMeasure, int32_t len){
// same measure // same measure
if(preElements->measureLen == currElements->measureLen if(isSameMeasure){
&& memcmp(preElements->measure, currElements->measure, currElements->measureLen) == 0){
if(varDataTLen(preLineKeys) != varDataTLen(currentLineKeys) if(varDataTLen(preLineKeys) != varDataTLen(currentLineKeys)
|| memcmp(preLineKeys, currentLineKeys, varDataTLen(preLineKeys)) != 0){ || memcmp(preLineKeys, currentLineKeys, varDataTLen(preLineKeys)) != 0){
return false; return false;
} }
}else{ // diff measure }else{ // diff measure
void *keyStr = taosHashGet(superTableKeyStr, currElements->measure, currElements->measureLen); void *keyStr = nodeListGet(*superTableKeyStr, currElements->measure, currElements->measureLen);
if(unlikely(keyStr == NULL)){ if(unlikely(keyStr == NULL)){
keyStr = taosMemoryMalloc(len); keyStr = taosMemoryMalloc(len);
varDataCopy(keyStr, currentLineKeys); varDataCopy(keyStr, currentLineKeys);
taosHashPut(superTableKeyStr, currElements->measure, currElements->measureLen, &keyStr, POINTER_BYTES); nodeListSet(superTableKeyStr, currElements->measure, currElements->measureLen, keyStr);
}else{ }else{
if(varDataTLen(keyStr) != varDataTLen(currentLineKeys) if(varDataTLen(keyStr) != varDataTLen(currentLineKeys)
&& memcmp(keyStr, currentLineKeys, varDataTLen(currentLineKeys)) != 0){ && memcmp(keyStr, currentLineKeys, varDataTLen(currentLineKeys)) != 0){
...@@ -1166,27 +1208,40 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1166,27 +1208,40 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
bool isSuperKVInit = false; bool isSuperKVInit = false;
SArray *superKV = NULL; SArray *superKV = NULL;
if(info->dataFormat){ if(info->dataFormat){
if(currElement->measureLen == info->preLine.measureLen if(currElement->measureTagsLen == info->preLine.measureTagsLen
&& memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){
isSameCTable = true;
if(isTag) return TSDB_CODE_SUCCESS;
}else if(!isTag){
SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) {
smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
info->currTableDataCtx = oneTable->tableDataCtx;
}
if(isSameCTable){
isSameMeasure = true;
}else if(currElement->measureLen == info->preLine.measureLen
&& memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){ && memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){
isSameMeasure = true; isSameMeasure = true;
} }
if(!isSameMeasure){ if(!isSameMeasure){
SSmlSTableMeta *sMeta = NULL; SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen);
SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
if(tableMeta == NULL){ if(sMeta == NULL){
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
meta->tableMeta = pTableMeta; sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){ if(pTableMeta == NULL){
info->dataFormat = false; info->dataFormat = false;
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &meta, POINTER_BYTES); nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
sMeta = meta;
}else{
sMeta = *tableMeta;
} }
info->currSTableMeta = sMeta->tableMeta; info->currSTableMeta = sMeta->tableMeta;
...@@ -1200,19 +1255,6 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1200,19 +1255,6 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
} }
} }
if(currElement->measureTagsLen == info->preLine.measureTagsLen
&& memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){
isSameCTable = true;
if(isTag) return TSDB_CODE_SUCCESS;
}else if(!isTag){
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) {
smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
info->currTableDataCtx = (*oneTable)->tableDataCtx;
}
if(isTag){ if(isTag){
// prepare for judging if tag or col is the same for each line // prepare for judging if tag or col is the same for each line
if(unlikely(info->currentLineTagKeys == NULL)){ // sml todo size need remalloc if(unlikely(info->currentLineTagKeys == NULL)){ // sml todo size need remalloc
...@@ -1296,16 +1338,17 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1296,16 +1338,17 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
(*sql)++; (*sql)++;
continue; continue;
} }
if (!isInQuote && IS_SPACE(*sql)) { if (!isInQuote){
if (IS_SPACE(*sql)) {
break; break;
} }else if (IS_COMMA(*sql)) {
if (!isInQuote && IS_COMMA(*sql)) {
break; break;
} }else if (IS_EQUAL(*sql)) {
if (!isInQuote && IS_EQUAL(*sql)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
}
(*sql)++; (*sql)++;
} }
valueLen = *sql - value; valueLen = *sql - value;
...@@ -1368,19 +1411,20 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1368,19 +1411,20 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
if(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length){ if(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length){
preKV->length = kv.length; preKV->length = kv.length;
SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen);
if(tableMeta == NULL){ if(tableMeta == NULL){
smlBuildInvalidDataMsg(&info->msgBuf, "measure should has inside", value); smlBuildInvalidDataMsg(&info->msgBuf, "measure should has inside", value);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
if(isTag){ if(isTag){
superKV = (*tableMeta)->tags; superKV = tableMeta->tags;
}else{ }else{
superKV = (*tableMeta)->cols; superKV = tableMeta->cols;
} }
SSmlKv *oldKV = taosArrayGet(superKV, cnt); SSmlKv *oldKV = taosArrayGet(superKV, cnt);
oldKV->length = kv.length; oldKV->length = kv.length;
info->needModifySchema = true;
} }
}else{ }else{
if(isSuperKVInit){ if(isSuperKVInit){
...@@ -1404,6 +1448,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1404,6 +1448,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
}else{ }else{
kv.length = preKV->length; kv.length = preKV->length;
} }
info->needModifySchema = true;
} }
} }
taosArrayPush(preLineKV, &kv); taosArrayPush(preLineKV, &kv);
...@@ -1435,8 +1480,8 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1435,8 +1480,8 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
if(info->dataFormat){ if(info->dataFormat){
if(isTag){ if(isTag){
info->dataFormat = smlFormatJudge(info->superTableTagKeyStr, info->preLineTagKeys, info->dataFormat = smlFormatJudge(&info->superTableTagKeyStr, info->preLineTagKeys,
info->currentLineTagKeys, currElement, &info->preLine, sqlEnd - currElement->tags); info->currentLineTagKeys, currElement, isSameMeasure, sqlEnd - currElement->tags);
if(!info->dataFormat) { if(!info->dataFormat) {
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1447,7 +1492,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1447,7 +1492,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
return TSDB_CODE_PAR_INVALID_TAGS_NUM; return TSDB_CODE_PAR_INVALID_TAGS_NUM;
} }
void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) { if (unlikely(oneTable == NULL)) {
SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen); SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
if (!tinfo) { if (!tinfo) {
...@@ -1463,21 +1508,21 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1463,21 +1508,21 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES); nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo);
} }
} }
}else{ }else{
info->dataFormat = smlFormatJudge(info->superTableColKeyStr, info->preLineColKeys, info->dataFormat = smlFormatJudge(&info->superTableColKeyStr, info->preLineColKeys,
info->currentLineColKeys, currElement, &info->preLine, sqlEnd - currElement->cols); info->currentLineColKeys, currElement, isSameMeasure, sqlEnd - currElement->cols);
if(!info->dataFormat) { if(!info->dataFormat) {
info->reRun = true; info->reRun = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
}else{ }else{
void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen);
if (unlikely(oneTable == NULL)) { if (unlikely(oneTable == NULL)) {
SSmlTableInfo *tinfo = smlBuildTableInfo(info->affectedRows / 2, currElement->measure, currElement->measureLen); SSmlTableInfo *tinfo = smlBuildTableInfo(info->lineNum / 2, currElement->measure, currElement->measureLen);
if (!tinfo) { if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -1485,7 +1530,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd ...@@ -1485,7 +1530,7 @@ static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i)); taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
} }
smlSetCTableName(tinfo); smlSetCTableName(tinfo);
taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES); nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo);
} }
taosArrayDestroy(preLineKV); // smltodo taosArrayDestroy(preLineKV); // smltodo
} }
...@@ -1737,8 +1782,9 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const cha ...@@ -1737,8 +1782,9 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const cha
return ret; return ret;
} }
// parse tags // parse tags sml todo
ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, NULL, &info->msgBuf);
// ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return ret; return ret;
...@@ -1780,7 +1826,7 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols ...@@ -1780,7 +1826,7 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) { static void smlDestroyTableInfo(SSmlTableInfo *tag) {
for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
taosHashCleanup(kvHash); taosHashCleanup(kvHash);
...@@ -1806,43 +1852,56 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { ...@@ -1806,43 +1852,56 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void smlDestroyInfo(SSmlHandle *info) { void smlDestroyInfo(void *data) {
if (!info) return; if (!data) return;
SSmlHandle *info = (SSmlHandle *)data;
qDestroyQuery(info->pQuery); qDestroyQuery(info->pQuery);
// destroy info->childTables // destroy info->childTables
void **p1 = (void **)taosHashIterate(info->childTables, NULL); NodeList* tmp = info->childTables;
while (p1) { while (tmp) {
smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1)); if(tmp->data.used) {
p1 = (void **)taosHashIterate(info->childTables, p1); smlDestroyTableInfo(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = t;
} }
taosHashCleanup(info->childTables);
// destroy info->superTables // destroy info->superTables
p1 = (void **)taosHashIterate(info->superTables, NULL); tmp = info->superTables;
while (p1) { while (tmp) {
smlDestroySTableMeta((SSmlSTableMeta *)(*p1)); if(tmp->data.used) {
p1 = (void **)taosHashIterate(info->superTables, p1); smlDestroySTableMeta(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = t;
} }
taosHashCleanup(info->superTables);
// destroy info->pVgHash // destroy info->pVgHash
taosHashCleanup(info->pVgHash); taosHashCleanup(info->pVgHash);
destroyRequest(info->pRequest); destroyRequest(info->pRequest);
p1 = (void **)taosHashIterate(info->superTableTagKeyStr, NULL); tmp = info->superTableTagKeyStr;
while (p1) { while (tmp) {
taosMemoryFree(*p1); if(tmp->data.used) {
p1 = (void **)taosHashIterate(info->superTableTagKeyStr, p1); taosMemoryFree(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = tmp->next;
} }
taosHashCleanup(info->superTableTagKeyStr);
p1 = (void **)taosHashIterate(info->superTableColKeyStr, NULL); tmp = info->superTableColKeyStr;
while (p1) { while (tmp) {
taosMemoryFree(*p1); if(tmp->data.used) {
p1 = (void **)taosHashIterate(info->superTableColKeyStr, p1); taosMemoryFree(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = tmp->next;
} }
taosHashCleanup(info->superTableColKeyStr);
taosMemoryFree(info->currentLineTagKeys); taosMemoryFree(info->currentLineTagKeys);
taosMemoryFree(info->preLineTagKeys); taosMemoryFree(info->preLineTagKeys);
...@@ -1851,61 +1910,103 @@ static void smlDestroyInfo(SSmlHandle *info) { ...@@ -1851,61 +1910,103 @@ static void smlDestroyInfo(SSmlHandle *info) {
taosArrayDestroy(info->preLineTagKV); taosArrayDestroy(info->preLineTagKV);
taosArrayDestroy(info->preLineColKV); taosArrayDestroy(info->preLineColKV);
for(int i = 0; i < taosArrayGetSize(info->lines); i++){ for(int i = 0; i < info->lineNum; i++){
taosArrayDestroy(((SSmlLineInfo*)taosArrayGet(info->lines, i))->colArray); taosArrayDestroy(info->lines[i].colArray);
} }
taosArrayDestroy(info->lines); taosMemoryFree(info->lines);
cJSON_Delete(info->root); cJSON_Delete(info->root);
taosMemoryFreeClear(info); taosMemoryFreeClear(info);
} }
static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision, int32_t perBatch) { int32_t smlInitInfo(void *data, SRequestObj *request, bool isRawLine, int32_t ttl,
SMLProtocolType protocol, int8_t precision, int32_t lineNum) {
if (!data) return TSDB_CODE_SML_INVALID_DATA;
SSmlHandle *info = (SSmlHandle *)data;
info->id = smlGenId();
info->pRequest = request;
info->isRawLine = isRawLine;
info->ttl = ttl;
info->precision = precision;
info->protocol = protocol;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->pQuery = smlInitHandle();
if(lineNum > info->lineNum && !info->dataFormat){
void *tmp = taosMemoryRealloc(info->lines, lineNum * sizeof(SSmlLineInfo));
if(tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
info->lines = tmp;
}
info->lineNum = lineNum;
return TSDB_CODE_SUCCESS;
}
void smlClearInfo(void *data) {
if (!data) return;
SSmlHandle *info = (SSmlHandle *)data;
// clear info->childTables
NodeList *tmp = info->childTables;
while (tmp) {
if(tmp->data.used){
smlDestroySTableMeta((SSmlSTableMeta *)(tmp->data.value));
tmp->data.used = false;
}
tmp = tmp->next;
}
// tmp = info->superTableTagKeyStr;
// while (tmp) {
// taosMemoryFree(tmp->data.value);
// tmp->data.used = false;
// tmp = tmp->next;
// }
//
// tmp = info->superTableColKeyStr;
// while (tmp) {
// taosMemoryFree(tmp->data.value);
// tmp->data.used = false;
// tmp = tmp->next;
// }
if(!info->dataFormat){
for(int i = 0; i < info->lineNum; i++){
taosArrayDestroy(info->lines[i].colArray);
}
memset(info->lines, 0, info->lineNum * sizeof(SSmlLineInfo));
}
cJSON_Delete(info->root);
qDestroyQuery(info->pQuery);
info->pQuery = NULL;
}
static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle)); SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) { if (NULL == info) {
return NULL; return NULL;
} }
info->id = smlGenId(); info->taos = acquireTscObj(*(int64_t *)taos);
info->pQuery = smlInitHandle();
if (pTscObj) {
info->taos = pTscObj;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code); uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
goto cleanup; goto cleanup;
} }
}
info->precision = precision;
info->protocol = protocol;
info->dataFormat = true; info->dataFormat = true;
info->affectedRows = perBatch;
if (request) {
info->pRequest = request;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
}
info->lines = taosArrayInit(perBatch, sizeof(SSmlLineInfo));
taosArraySetSize(info->lines, perBatch);
info->superTableTagKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->superTableColKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->superTableTagKeyStr if (NULL == info->pVgHash) {
|| NULL == info->superTableColKeyStr || NULL == info->pVgHash) { uError("create SSmlHandle failed");
uError("SML:0x%" PRIx64 " create info failed", info->id);
goto cleanup; goto cleanup;
} }
return info; return info;
cleanup:
cleanup:
smlDestroyInfo(info); smlDestroyInfo(info);
return NULL; return NULL;
} }
...@@ -2369,8 +2470,9 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * ...@@ -2369,8 +2470,9 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
} }
uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id); uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
// Parse tags // Parse tags sml todo
ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, NULL, &info->msgBuf);
// ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret) { if (ret) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret; return ret;
...@@ -2383,18 +2485,15 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * ...@@ -2383,18 +2485,15 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
static int32_t smlParseLineBottom(SSmlHandle *info) { static int32_t smlParseLineBottom(SSmlHandle *info) {
if(info->dataFormat) return TSDB_CODE_SUCCESS; if(info->dataFormat) return TSDB_CODE_SUCCESS;
for(int32_t i = 0; i < taosArrayGetSize(info->lines); i ++){ for(int32_t i = 0; i < info->lineNum; i ++){
SSmlLineInfo* elements = taosArrayGet(info->lines, i); SSmlLineInfo* elements = info->lines + i;
bool hasTable = true; SSmlTableInfo *tinfo =
SSmlTableInfo *tinfo = NULL; (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen);
SSmlTableInfo **oneTable = if(tinfo == NULL){
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
if(oneTable == NULL){
uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i); uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure); smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
tinfo = *oneTable;
if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) { if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
...@@ -2406,11 +2505,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -2406,11 +2505,11 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
return ret; return ret;
} }
SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen);
if (tableMeta) { // update meta if (tableMeta) { // update meta
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
} }
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
...@@ -2426,7 +2525,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -2426,7 +2525,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
smlInsertMeta(meta->colHash, meta->cols, elements->colArray); smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta);
} }
} }
...@@ -2455,18 +2554,17 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { ...@@ -2455,18 +2554,17 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
} }
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id); uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
smlDestroyTableInfo(info, tinfo); smlDestroyTableInfo(tinfo);
taosArrayDestroy(cols); taosArrayDestroy(cols);
return ret; return ret;
} }
if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) { if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
smlDestroyTableInfo(info, tinfo); smlDestroyTableInfo(tinfo);
taosArrayDestroy(cols); taosArrayDestroy(cols);
return TSDB_CODE_PAR_INVALID_TAGS_NUM; return TSDB_CODE_PAR_INVALID_TAGS_NUM;
} }
taosHashClear(info->dumplicateKey);
if (strlen(tinfo->childTableName) == 0) { if (strlen(tinfo->childTableName) == 0) {
RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0}; RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
...@@ -2477,23 +2575,23 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { ...@@ -2477,23 +2575,23 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
} }
bool hasTable = true; bool hasTable = true;
SSmlTableInfo **oneTable = SSmlTableInfo *oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); (SSmlTableInfo *)nodeListGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
if (!oneTable) { if (!oneTable) {
taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES); nodeListSet(&info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), tinfo);
oneTable = &tinfo; oneTable = tinfo;
hasTable = false; hasTable = false;
} else { } else {
smlDestroyTableInfo(info, tinfo); smlDestroyTableInfo(tinfo);
} }
taosArrayPush((*oneTable)->cols, &cols); taosArrayPush(oneTable->cols, &cols);
SSmlSTableMeta **tableMeta = SSmlSTableMeta *tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); (SSmlSTableMeta *)nodeListGet(info->superTables, oneTable->sTableName, oneTable->sTableNameLen);
if (tableMeta) { // update meta if (tableMeta) { // update meta
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, false, &info->msgBuf); ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, cols, false, &info->msgBuf);
if (!hasTable && ret == TSDB_CODE_SUCCESS) { if (!hasTable && ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, true, &info->msgBuf); ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, oneTable->tags, true, &info->msgBuf);
} }
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
...@@ -2501,9 +2599,9 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { ...@@ -2501,9 +2599,9 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
} }
} else { } else {
SSmlSTableMeta *meta = smlBuildSTableMeta(false); SSmlSTableMeta *meta = smlBuildSTableMeta(false);
smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags); smlInsertMeta(meta->tagHash, meta->tags, oneTable->tags);
smlInsertMeta(meta->colHash, meta->cols, cols); smlInsertMeta(meta->colHash, meta->cols, cols);
taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES); nodeListSet(&info->superTables, oneTable->sTableName, oneTable->sTableNameLen, meta);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2550,9 +2648,9 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) { ...@@ -2550,9 +2648,9 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
static int32_t smlInsertData(SSmlHandle *info) { static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL); NodeList* tmp = info->childTables;
while (oneTable) { while (tmp) {
SSmlTableInfo *tableData = *oneTable; SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
...@@ -2572,22 +2670,22 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -2572,22 +2670,22 @@ static int32_t smlInsertData(SSmlHandle *info) {
} }
taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg)); taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
SSmlSTableMeta **pMeta = SSmlSTableMeta *pMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen); (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
ASSERT(NULL != pMeta && NULL != *pMeta); ASSERT(NULL != pMeta);
// use tablemeta of stable to save vgid and uid of child table // use tablemeta of stable to save vgid and uid of child table
(*pMeta)->tableMeta->vgId = vg.vgId; pMeta->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid pMeta->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
info->ttl, info->msgBuf.buf, info->msgBuf.len); info->ttl, info->msgBuf.buf, info->msgBuf.len);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBindData failed", info->id); uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
return code; return code;
} }
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable); tmp = tmp->next;
} }
code = smlBuildOutput(info->pQuery, info->pVgHash); code = smlBuildOutput(info->pQuery, info->pVgHash);
...@@ -2597,20 +2695,11 @@ static int32_t smlInsertData(SSmlHandle *info) { ...@@ -2597,20 +2695,11 @@ static int32_t smlInsertData(SSmlHandle *info) {
} }
info->cost.insertRpcTime = taosGetTimestampUs(); info->cost.insertRpcTime = taosGetTimestampUs();
// launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
// info->affectedRows = taos_affected_rows(info->pRequest);
// return info->pRequest->code;
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary; SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
if (pWrapper == NULL) { return info->pRequest->code;
return TSDB_CODE_OUT_OF_MEMORY;
}
pWrapper->pRequest = info->pRequest;
launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
return TSDB_CODE_SUCCESS;
} }
static void smlPrintStatisticInfo(SSmlHandle *info) { static void smlPrintStatisticInfo(SSmlHandle *info) {
...@@ -2664,7 +2753,12 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char ...@@ -2664,7 +2753,12 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp)); uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
code = smlParseInfluxString(info, tmp, tmp + len, taosArrayGet(info->lines, i)); if(info->dataFormat){
SSmlLineInfo element = {0};
code = smlParseInfluxString(info, tmp, tmp + len, &element);
}else{
code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
}
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
code = smlParseTelnetLine(info, tmp, len); code = smlParseTelnetLine(info, tmp, len);
} else { } else {
...@@ -2678,20 +2772,31 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char ...@@ -2678,20 +2772,31 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
i = 0; i = 0;
info->reRun = false; info->reRun = false;
// clear info->childTables // clear info->childTables
void **p1 = (void **)taosHashIterate(info->childTables, NULL); NodeList* pList = info->childTables;
while (p1) { while (pList) {
smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1)); if(pList->data.used) {
p1 = (void **)taosHashIterate(info->childTables, p1); smlDestroyTableInfo(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
} }
taosHashClear(info->childTables);
// clear info->superTables // clear info->superTables
p1 = (void **)taosHashIterate(info->superTables, NULL); pList = info->superTables;
while (p1) { while (pList) {
smlDestroySTableMeta((SSmlSTableMeta *)(*p1)); if(pList->data.used) {
p1 = (void **)taosHashIterate(info->superTables, p1); smlDestroySTableMeta(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
}
if(info->lines != NULL){
uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
return TSDB_CODE_SML_INVALID_DATA;
} }
taosHashClear(info->superTables); info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
continue; continue;
} }
i++; i++;
...@@ -2719,15 +2824,15 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL ...@@ -2719,15 +2824,15 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
} }
info->cost.lineNum = numLines; info->cost.lineNum = numLines;
info->cost.numOfSTables = taosHashGetSize(info->superTables); info->cost.numOfSTables = nodeListSize(info->superTables);
info->cost.numOfCTables = taosHashGetSize(info->childTables); info->cost.numOfCTables = nodeListSize(info->childTables);
info->cost.schemaTime = taosGetTimestampUs(); info->cost.schemaTime = taosGetTimestampUs();
do { do {
code = smlModifyDBSchemas(info); code = smlModifyDBSchemas(info);
if (code == 0) break; if (code == 0) break;
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
if (code != 0) { if (code != 0) {
uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code)); uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
...@@ -2744,89 +2849,50 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL ...@@ -2744,89 +2849,50 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
return code; return code;
} }
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) { TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
// SCatalog *catalog = NULL; int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
// int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog); if (NULL == taos) {
// if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TSC_DISCONNECTED;
// uError("SML get catalog error %d", code); return NULL;
// return code;
// }
//
// SName name;
// tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
// char dbFname[TSDB_DB_FNAME_LEN] = {0};
// tNameGetFullDbName(&name, dbFname);
// SDbCfgInfo pInfo = {0};
//
// SRequestConnInfo conn = {0};
// conn.pTrans = taos->pAppInfo->pTransporter;
// conn.requestId = request->requestId;
// conn.requestObjRefId = request->self;
// conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
//
// code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// taosArrayDestroy(pInfo.pRetensions);
//
// if (!pInfo.schemaless) {
// return TSDB_CODE_SML_INVALID_DB_CONF;
// }
return TSDB_CODE_SUCCESS;
}
static void smlInsertCallback(void *param, void *res, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)res;
SSmlHandle *info = (SSmlHandle *)param;
int32_t rows = taos_affected_rows(pRequest);
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
Params *pParam = info->params;
// lock
taosThreadSpinLock(&pParam->lock);
pParam->cnt++;
if (code != TSDB_CODE_SUCCESS) {
pParam->request->code = code;
pParam->request->body.resInfo.numOfRows += rows;
} else {
pParam->request->body.resInfo.numOfRows += info->affectedRows;
} }
// unlock
taosThreadSpinUnlock(&pParam->lock);
if (pParam->cnt == pParam->total) { STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
tsem_post(&pParam->sem); if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (request == NULL) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
} }
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
smlDestroyInfo(info);
}
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
int numLines, int protocol, int precision, int32_t ttl) {
int batchs = 0;
STscObj *pTscObj = request->pTscObj;
pTscObj->schemalessType = 1; SSmlHandle *info = NULL;
if(pTscObj->smlHandle == NULL){
info = smlBuildSmlInfo(taos);
if (info == NULL) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
}
pTscObj->smlHandle = info;
}else{
info = (SSmlHandle *)(pTscObj->smlHandle);
smlClearInfo(info);
}
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
Params params = {0}; int ret = smlInitInfo(info, request, rawLine != NULL,
params.request = request; ttl, protocol, precision, numLines);
tsem_init(&params.sem, 0, 0); if(ret != TSDB_CODE_SUCCESS){
taosThreadSpinInit(&(params.lock), 0);
if (request->pDb == NULL) {
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL); smlBuildInvalidDataMsg(&msg, "smlInitInfo error", NULL);
goto end; goto end;
} }
if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) { if (request->pDb == NULL) {
request->code = TSDB_CODE_SML_INVALID_DB_CONF; request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
goto end; goto end;
} }
...@@ -2851,64 +2917,13 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char ...@@ -2851,64 +2917,13 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
goto end; goto end;
} }
batchs = ceil(((double)numLines) / tsSmlBatchSize); int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
params.total = batchs; request->code = code;
for (int i = 0; i < batchs; ++i) { info->cost.endTime = taosGetTimestampUs();
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0); info->cost.code = code;
if (!req) { smlPrintStatisticInfo(info);
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null");
goto end;
}
int32_t perBatch = tsSmlBatchSize;
if (numLines > perBatch) {
numLines -= perBatch;
} else {
perBatch = numLines;
numLines = 0;
}
SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision, perBatch);
if (!info) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
}
info->isRawLine = (rawLine == NULL);
info->ttl = ttl;
info->params = &params;
info->pRequest->body.queryFp = smlInsertCallback;
info->pRequest->body.param = info;
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
if (lines) {
lines += perBatch;
}
if (rawLine) {
int num = 0;
while (rawLine < rawLineEnd) {
if (*(rawLine++) == '\n') {
num++;
}
if (num == perBatch) {
break;
}
}
}
if (code != TSDB_CODE_SUCCESS) {
info->pRequest->body.queryFp(info, req, code);
}
}
tsem_wait(&params.sem);
end: end:
taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem);
// ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf); uDebug("resultend:%s", request->msgBuf);
return (TAOS_RES *)request; return (TAOS_RES *)request;
} }
...@@ -2934,25 +2949,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char ...@@ -2934,25 +2949,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
int32_t ttl, int64_t reqid) { int32_t ttl, int64_t reqid) {
if (NULL == taos) { return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (!request) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
if (!lines) {
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
return (TAOS_RES *)request;
}
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision, ttl);
} }
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
...@@ -2969,24 +2966,6 @@ TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLi ...@@ -2969,24 +2966,6 @@ TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLi
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
int precision, int32_t ttl, int64_t reqid) { int precision, int32_t ttl, int64_t reqid) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (!request) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
if (!lines || len <= 0) {
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
return (TAOS_RES *)request;
}
int numLines = 0; int numLines = 0;
*totalRows = 0; *totalRows = 0;
char *tmp = lines; char *tmp = lines;
...@@ -2999,7 +2978,7 @@ TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int ...@@ -2999,7 +2978,7 @@ TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int
tmp = lines + i + 1; tmp = lines + i + 1;
} }
} }
return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision, ttl); return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
} }
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) { TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
......
...@@ -1142,8 +1142,8 @@ int sml_ttl_Test() { ...@@ -1142,8 +1142,8 @@ int sml_ttl_Test() {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int ret = 0; int ret = 0;
// ret = sml_ttl_Test(); ret = sml_ttl_Test();
// ASSERT(!ret); ASSERT(!ret);
ret = sml_ts2164_Test(); ret = sml_ts2164_Test();
ASSERT(!ret); ASSERT(!ret);
ret = smlProcess_influx_Test(); ret = smlProcess_influx_Test();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册