提交 0318c20e 编写于 作者: wmmhello's avatar wmmhello

fix:change the new interface name of schemaless

上级 c12bcc6c
...@@ -103,11 +103,11 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value, ...@@ -103,11 +103,11 @@ int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point); void destroySmlDataPoint(TAOS_SML_DATA_POINT* point);
int taos_insert_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol, int taos_insert_lines(TAOS* taos, char* data, int32_t len, char* lines[], int *numLines, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows); SMLTimeStampType tsType, int* affectedRows);
int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol, int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int *numLines, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows); SMLTimeStampType tsType, int* affectedRows);
int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol, int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol, int *numLines,
SMLTimeStampType tsType, int* affectedRows); SMLTimeStampType tsType, int* affectedRows);
......
...@@ -2699,7 +2699,7 @@ int32_t tscParseLines(char* data, int32_t len, char* lines[], int numLines, SArr ...@@ -2699,7 +2699,7 @@ int32_t tscParseLines(char* data, int32_t len, char* lines[], int numLines, SArr
return code; return code;
} }
int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) { int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int *numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) {
int32_t code = 0; int32_t code = 0;
SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo)); SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
...@@ -2708,26 +2708,26 @@ int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLin ...@@ -2708,26 +2708,26 @@ int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLin
info->protocol = protocol; info->protocol = protocol;
if (data){ if (data){
numLines = 0; *numLines = 0;
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
if(data[i] == '\0'){ if(data[i] == '\0'){
data[i] = '0'; data[i] = '0';
} }
if(data[i] == '\n' || i == len - 1){ if(data[i] == '\n' || i == len - 1){
numLines++; (*numLines)++;
} }
} }
} }
if (numLines <= 0 || numLines > 65536*32) { if (*numLines <= 0 || *numLines > 65536*32) {
tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536*32. numLines: %d", info->id, numLines); tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536*32. numLines: %d", info->id, *numLines);
tfree(info); tfree(info);
code = TSDB_CODE_TSC_APP_ERROR; code = TSDB_CODE_TSC_APP_ERROR;
return code; return code;
} }
if(lines){ if(lines){
for (int i = 0; i < numLines; ++i) { for (int i = 0; i < *numLines; ++i) {
if (lines[i] == NULL) { if (lines[i] == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i); tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
tfree(info); tfree(info);
...@@ -2737,22 +2737,22 @@ int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLin ...@@ -2737,22 +2737,22 @@ int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLin
} }
} }
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT)); SArray* lpPoints = taosArrayInit(*numLines, sizeof(TAOS_SML_DATA_POINT));
if (lpPoints == NULL) { if (lpPoints == NULL) {
tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
tfree(info); tfree(info);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines", info->id, numLines); tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines", info->id, *numLines);
code = tscParseLines(data, len, lines, numLines, lpPoints, NULL, info); code = tscParseLines(data, len, lines, *numLines, lpPoints, NULL, info);
size_t numPoints = taosArrayGetSize(lpPoints); size_t numPoints = taosArrayGetSize(lpPoints);
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
if (code != 0) { if (code != 0) {
goto cleanup; goto cleanup;
} }
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = tscSmlInsert(taos, points, (int)numPoints, info); code = tscSmlInsert(taos, points, (int)numPoints, info);
if (code != 0) { if (code != 0) {
tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code))); tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
...@@ -2762,9 +2762,7 @@ int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLin ...@@ -2762,9 +2762,7 @@ int taos_insert_lines(TAOS* taos, char* data, int len, char* lines[], int numLin
} }
cleanup: cleanup:
tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code); tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, *numLines, code);
points = TARRAY_GET_START(lpPoints);
numPoints = taosArrayGetSize(lpPoints);
for (int i=0; i<numPoints; ++i) { for (int i=0; i<numPoints; ++i) {
destroySmlDataPoint(points+i); destroySmlDataPoint(points+i);
} }
...@@ -2860,13 +2858,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2860,13 +2858,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
switch (protocol) { switch (protocol) {
case TSDB_SML_LINE_PROTOCOL: case TSDB_SML_LINE_PROTOCOL:
code = taos_insert_lines(taos, NULL, 0, lines, numLines, protocol, tsType, &affected_rows); code = taos_insert_lines(taos, NULL, 0, lines, &numLines, protocol, tsType, &affected_rows);
break; break;
case TSDB_SML_TELNET_PROTOCOL: case TSDB_SML_TELNET_PROTOCOL:
code = taos_insert_telnet_lines(taos, NULL, 0, lines, numLines, protocol, tsType, &affected_rows); code = taos_insert_telnet_lines(taos, NULL, 0, lines, &numLines, protocol, tsType, &affected_rows);
break; break;
case TSDB_SML_JSON_PROTOCOL: case TSDB_SML_JSON_PROTOCOL:
code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows); code = taos_insert_json_payload(taos, *lines, protocol, &numLines, tsType, &affected_rows);
break; break;
default: default:
code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE; code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE;
...@@ -2891,15 +2889,20 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t * ...@@ -2891,15 +2889,20 @@ TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *
} }
} }
if(!totalRows){
tscError("totalRows is null");
return NULL;
}
switch (protocol) { switch (protocol) {
case TSDB_SML_LINE_PROTOCOL: case TSDB_SML_LINE_PROTOCOL:
code = taos_insert_lines(taos, lines, len, NULL, 0, protocol, tsType, &affected_rows); code = taos_insert_lines(taos, lines, len, NULL, totalRows, protocol, tsType, &affected_rows);
break; break;
case TSDB_SML_TELNET_PROTOCOL: case TSDB_SML_TELNET_PROTOCOL:
code = taos_insert_telnet_lines(taos, lines, len, NULL, 0, protocol, tsType, &affected_rows); code = taos_insert_telnet_lines(taos, lines, len, NULL, totalRows, protocol, tsType, &affected_rows);
break; break;
case TSDB_SML_JSON_PROTOCOL: case TSDB_SML_JSON_PROTOCOL:
code = taos_insert_json_payload(taos, lines, protocol, tsType, &affected_rows); code = taos_insert_json_payload(taos, lines, protocol, totalRows, tsType, &affected_rows);
break; break;
default: default:
code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE; code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE;
......
...@@ -450,7 +450,7 @@ static int32_t tscParseTelnetLines(char* data, int32_t len, char* lines[], int n ...@@ -450,7 +450,7 @@ static int32_t tscParseTelnetLines(char* data, int32_t len, char* lines[], int n
return code; return code;
} }
int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) { int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], int *numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) {
int32_t code = 0; int32_t code = 0;
SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo)); SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
...@@ -459,26 +459,26 @@ int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], ...@@ -459,26 +459,26 @@ int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[],
info->protocol = protocol; info->protocol = protocol;
if (data && !lines){ if (data && !lines){
numLines = 0; *numLines = 0;
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
if(data[i] == '\0'){ if(data[i] == '\0'){
data[i] = '0'; data[i] = '0';
} }
if(data[i] == '\n' || i == len - 1){ if(data[i] == '\n' || i == len - 1){
numLines++; (*numLines)++;
} }
} }
} }
if (numLines <= 0 || numLines > 65536) { if (*numLines <= 0 || *numLines > 65536) {
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, *numLines);
tfree(info); tfree(info);
code = TSDB_CODE_TSC_APP_ERROR; code = TSDB_CODE_TSC_APP_ERROR;
return code; return code;
} }
if(!data && lines){ if(!data && lines){
for (int i = 0; i < numLines; ++i) { for (int i = 0; i < *numLines; ++i) {
if (lines[i] == NULL) { if (lines[i] == NULL) {
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i); tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i);
tfree(info); tfree(info);
...@@ -488,22 +488,22 @@ int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], ...@@ -488,22 +488,22 @@ int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[],
} }
} }
SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT)); SArray* lpPoints = taosArrayInit(*numLines, sizeof(TAOS_SML_DATA_POINT));
if (lpPoints == NULL) { if (lpPoints == NULL) {
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines failed to allocate memory", info->id); tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines failed to allocate memory", info->id);
tfree(info); tfree(info);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]); tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, *numLines, lines[0]);
code = tscParseTelnetLines(data, len, lines, numLines, lpPoints, NULL, info); code = tscParseTelnetLines(data, len, lines, *numLines, lpPoints, NULL, info);
size_t numPoints = taosArrayGetSize(lpPoints); size_t numPoints = taosArrayGetSize(lpPoints);
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
if (code != 0) { if (code != 0) {
goto cleanup; goto cleanup;
} }
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = tscSmlInsert(taos, points, (int)numPoints, info); code = tscSmlInsert(taos, points, (int)numPoints, info);
if (code != 0) { if (code != 0) {
tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines error: %s", info->id, tstrerror((code))); tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines error: %s", info->id, tstrerror((code)));
...@@ -513,9 +513,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[], ...@@ -513,9 +513,7 @@ int taos_insert_telnet_lines(TAOS* taos, char* data, int32_t len, char* lines[],
} }
cleanup: cleanup:
tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines finish inserting %d lines. code: %d", info->id, numLines, code); tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines finish inserting %d lines. code: %d", info->id, *numLines, code);
points = TARRAY_GET_START(lpPoints);
numPoints = taosArrayGetSize(lpPoints);
for (int i = 0; i < numPoints; ++i) { for (int i = 0; i < numPoints; ++i) {
destroySmlDataPoint(points+i); destroySmlDataPoint(points+i);
} }
...@@ -1107,8 +1105,9 @@ PARSE_JSON_OVER: ...@@ -1107,8 +1105,9 @@ PARSE_JSON_OVER:
return ret; return ret;
} }
int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol, SMLTimeStampType tsType, int* affectedRows) { int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol, int32_t *totalRows, SMLTimeStampType tsType, int* affectedRows) {
int32_t code = 0; int32_t code = 0;
*totalRows = 0;
SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo)); SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
info->id = genUID(); info->id = genUID();
...@@ -1132,12 +1131,12 @@ int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol ...@@ -1132,12 +1131,12 @@ int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol
tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d points", info->id, 1); tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d points", info->id, 1);
code = tscParseMultiJSONPayload(payload, lpPoints, info); code = tscParseMultiJSONPayload(payload, lpPoints, info);
size_t numPoints = taosArrayGetSize(lpPoints); size_t numPoints = taosArrayGetSize(lpPoints);
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
if (code != 0) { if (code != 0) {
goto cleanup; goto cleanup;
} }
TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
code = tscSmlInsert(taos, points, (int)numPoints, info); code = tscSmlInsert(taos, points, (int)numPoints, info);
if (code != 0) { if (code != 0) {
tscError("OTD:0x%"PRIx64" taos_insert_json_payload error: %s", info->id, tstrerror((code))); tscError("OTD:0x%"PRIx64" taos_insert_json_payload error: %s", info->id, tstrerror((code)));
...@@ -1145,11 +1144,10 @@ int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol ...@@ -1145,11 +1144,10 @@ int taos_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol
if (affectedRows != NULL) { if (affectedRows != NULL) {
*affectedRows = info->affectedRows; *affectedRows = info->affectedRows;
} }
*totalRows = numPoints;
cleanup: cleanup:
tscDebug("OTD:0x%"PRIx64" taos_insert_json_payload finish inserting 1 Point. code: %d", info->id, code); tscDebug("OTD:0x%"PRIx64" taos_insert_json_payload finish inserting 1 Point. code: %d", info->id, code);
points = TARRAY_GET_START(lpPoints);
numPoints = taosArrayGetSize(lpPoints);
for (int i = 0; i < numPoints; ++i) { for (int i = 0; i < numPoints; ++i) {
destroySmlDataPoint(points+i); destroySmlDataPoint(points+i);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册