未验证 提交 ed43a4da 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13728 from taosdata/feature/TD-14761

fix:error in schemaless
...@@ -72,7 +72,7 @@ for (int i = 1; i < keyLen; ++i) { \ ...@@ -72,7 +72,7 @@ for (int i = 1; i < keyLen; ++i) { \
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " #define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
#define MAX_RETRY_TIMES 5 #define MAX_RETRY_TIMES 5
#define LINE_BATCH 20 #define LINE_BATCH 20000
//================================================================================================= //=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
...@@ -161,7 +161,6 @@ typedef struct { ...@@ -161,7 +161,6 @@ typedef struct {
typedef struct{ typedef struct{
SRequestObj* request; SRequestObj* request;
SCatalog* catalog;
tsem_t sem; tsem_t sem;
TdThreadSpinlock lock; TdThreadSpinlock lock;
} Params; } Params;
...@@ -1292,9 +1291,46 @@ static void smlDestroyTableInfo(SSmlHandle* info, SSmlTableInfo *tag){ ...@@ -1292,9 +1291,46 @@ static void smlDestroyTableInfo(SSmlHandle* info, SSmlTableInfo *tag){
taosMemoryFree(tag); taosMemoryFree(tag);
} }
static int32_t smlKvTimeArrayCompare(const void* key1, const void* key2) {
SArray *s1 = *(SArray **)key1;
SArray *s2 = *(SArray **)key2;
SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
if (kv1->i < kv2->i) {
return -1;
} else if (kv1->i > kv2->i) {
return 1;
} else {
return 0;
}
}
static int32_t smlKvTimeHashCompare(const void* key1, const void* key2) {
SHashObj *s1 = *(SHashObj **)key1;
SHashObj *s2 = *(SHashObj **)key2;
SSmlKv *kv1 = (SSmlKv *)taosHashGet(s1, TS, TS_LEN);
SSmlKv *kv2 = (SSmlKv *)taosHashGet(s2, TS, TS_LEN);
ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
if (kv1->i < kv2->i) {
return -1;
} else if (kv1->i > kv2->i) {
return 1;
} else {
return 0;
}
}
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
if(dataFormat){ if(dataFormat){
taosArrayPush(oneTable->cols, &cols); void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GE);
if(p == NULL){
taosArrayPush(oneTable->cols, &cols);
}else{
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1307,8 +1343,13 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col ...@@ -1307,8 +1343,13 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
} }
taosArrayPush(oneTable->cols, &kvHash);
void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GE);
if(p == NULL){
taosArrayPush(oneTable->cols, &kvHash);
}else{
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1419,6 +1460,11 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol ...@@ -1419,6 +1460,11 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV; ((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->taos = (STscObj *)taos; info->taos = (STscObj *)taos;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
goto cleanup;
}
info->precision = precision; info->precision = precision;
info->protocol = protocol; info->protocol = protocol;
...@@ -2196,6 +2242,7 @@ static int32_t smlInsertData(SSmlHandle* info) { ...@@ -2196,6 +2242,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
(*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len); (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){ if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlBindData failed", info->id);
return code; return code;
} }
oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable); oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable);
...@@ -2261,7 +2308,7 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) { ...@@ -2261,7 +2308,7 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
code = smlParseLine(info, lines, numLines); code = smlParseLine(info, lines, numLines);
if (code != 0) { if (code != 0) {
uError("SML:0x%"PRIx64" smlParseLine error : %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" smlParseLine error : %s", info->id, tstrerror(code));
goto cleanup; return code;
} }
info->cost.lineNum = numLines; info->cost.lineNum = numLines;
...@@ -2277,24 +2324,27 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) { ...@@ -2277,24 +2324,27 @@ static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
if (code != 0) { if (code != 0) {
uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
goto cleanup; return code;
} }
info->cost.insertBindTime = taosGetTimestampUs(); info->cost.insertBindTime = taosGetTimestampUs();
code = smlInsertData(info); code = smlInsertData(info);
if (code != 0) { if (code != 0) {
uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code)); uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code));
goto cleanup; return code;
} }
info->cost.endTime = taosGetTimestampUs();
cleanup:
info->cost.code = code;
smlPrintStatisticInfo(info);
return code; return code;
} }
static int32_t isSchemalessDb(STscObj *taos, SCatalog *catalog){ static int32_t isSchemalessDb(STscObj *taos){
SCatalog* catalog = NULL;
int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML get catalog error %d", code);
return code;
}
SName name; SName name;
tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db)); tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0}; char dbFname[TSDB_DB_FNAME_LEN] = {0};
...@@ -2302,7 +2352,7 @@ static int32_t isSchemalessDb(STscObj *taos, SCatalog *catalog){ ...@@ -2302,7 +2352,7 @@ static int32_t isSchemalessDb(STscObj *taos, SCatalog *catalog){
SDbCfgInfo pInfo = {0}; SDbCfgInfo pInfo = {0};
SEpSet ep = getEpSet_s(&taos->pAppInfo->mgmtEp); SEpSet ep = getEpSet_s(&taos->pAppInfo->mgmtEp);
int32_t code = catalogGetDBCfg(catalog, taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo); code = catalogGetDBCfg(catalog, taos->pAppInfo->pTransporter, &ep, dbFname, &pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2329,6 +2379,9 @@ static void smlInsertCallback(void* param, void* res, int32_t code) { ...@@ -2329,6 +2379,9 @@ static void smlInsertCallback(void* param, void* res, int32_t code) {
printf("SML:0x%" PRIx64 " insert finished, code: %d, total: %d\n", info->id, code, info->affectedRows); printf("SML:0x%" PRIx64 " insert finished, code: %d, total: %d\n", info->id, code, info->affectedRows);
Params *pParam = info->params; Params *pParam = info->params;
bool isLast = info->isLast; bool isLast = info->isLast;
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
smlDestroyInfo(info); smlDestroyInfo(info);
if(isLast){ if(isLast){
...@@ -2373,20 +2426,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2373,20 +2426,13 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
tsem_init(&params.sem, 0, 0); tsem_init(&params.sem, 0, 0);
taosThreadSpinInit(&(params.lock), 0); taosThreadSpinInit(&(params.lock), 0);
int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &params.catalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML get catalog error %d", code);
request->code = code;
goto end;
}
if(request->pDb == NULL){ if(request->pDb == NULL){
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL); smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
goto end; goto end;
} }
if(isSchemalessDb(((STscObj *)taos), params.catalog) != TSDB_CODE_SUCCESS){ if(isSchemalessDb(((STscObj *)taos)) != TSDB_CODE_SUCCESS){
request->code = TSDB_CODE_SML_INVALID_DB_CONF; request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
goto end; goto end;
...@@ -2436,11 +2482,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2436,11 +2482,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
} }
info->params = &params; info->params = &params;
info->pCatalog = params.catalog;
info->affectedRows = perBatch; info->affectedRows = perBatch;
info->pRequest->body.queryFp = smlInsertCallback; info->pRequest->body.queryFp = smlInsertCallback;
info->pRequest->body.param = info; info->pRequest->body.param = info;
code = smlProcess(info, lines, perBatch); int32_t code = smlProcess(info, lines, perBatch);
lines += perBatch; lines += perBatch;
if (code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS){
info->pRequest->body.queryFp(info, req, code); info->pRequest->body.queryFp(info, req, code);
......
此差异已折叠。
...@@ -86,11 +86,15 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { ...@@ -86,11 +86,15 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int nData = 0; int nData = 0;
tb_uid_t uid = 0; tb_uid_t uid = 0;
metaRLock(pMeta);
if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) { if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
uid = *(tb_uid_t *)pData; uid = *(tb_uid_t *)pData;
tdbFree(pData); tdbFree(pData);
} }
metaULock(pMeta);
return 0; return 0;
} }
......
...@@ -793,7 +793,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -793,7 +793,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
msgIter.suid = 0; msgIter.suid = 0;
} }
#ifdef TD_DEBUG_PRINT_ROW
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid"); vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
#endif
tDecoderClear(&decoder); tDecoderClear(&decoder);
} else { } else {
submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN); submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册