提交 8a8423d6 编写于 作者: wmmhello's avatar wmmhello

fix:add logic for auto create table in taosX

上级 a3161bf9
......@@ -25,11 +25,12 @@
#include "tref.h"
#include "ttimer.h"
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t, cJSON* tables) {
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
return;
return NULL;
}
cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type);
......@@ -86,7 +87,10 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
cJSON_AddItemToArray(tags, tag);
}
cJSON_AddItemToObject(json, "tags", tags);
cJSON_AddItemToArray(tables, json);
string = cJSON_PrintUnformatted(json);
cJSON_Delete(json);
return string;
}
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
......@@ -185,7 +189,6 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0};
SDecoder coder;
char* string = NULL;
cJSON* tables = cJSON_CreateArray();
// decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
......@@ -195,11 +198,9 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto _err;
}
buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, tables);
string = cJSON_PrintUnformatted(tables);
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
_err:
cJSON_Delete(tables);
tDecoderClear(&coder);
return string;
}
......@@ -226,23 +227,16 @@ _err:
return string;
}
static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum, cJSON* tables) {
SArray* pTagVals = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
return;
}
cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type);
// char cid[32] = {0};
// sprintf(cid, "%"PRIi64, id);
// cJSON* cid_ = cJSON_CreateString(cid);
// cJSON_AddItemToObject(json, "id", cid_);
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq){
STag* pTag = (STag*)pCreateReq->ctb.pTag;
char* sname = pCreateReq->ctb.name;
char* name = pCreateReq->name;
SArray* tagName = pCreateReq->ctb.tagName;
int64_t id = pCreateReq->uid;
uint8_t tagNum = pCreateReq->ctb.tagNum;
cJSON* tableName = cJSON_CreateString(name);
cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString("child");
cJSON_AddItemToObject(json, "tableType", tableType);
cJSON* using = cJSON_CreateString(sname);
cJSON_AddItemToObject(json, "using", using);
cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
......@@ -251,6 +245,7 @@ static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* t
// cJSON_AddItemToObject(json, "version", version);
cJSON* tags = cJSON_CreateArray();
SArray* pTagVals = NULL;
int32_t code = tTagToValArray(pTag, &pTagVals);
if (code) {
goto end;
......@@ -309,10 +304,38 @@ static void buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* t
cJSON_AddItemToArray(tags, tag);
}
end:
end:
cJSON_AddItemToObject(json, "tags", tags);
taosArrayDestroy(pTagVals);
cJSON_AddItemToArray(tables, json);
}
static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
return NULL;
}
cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type);
// char cid[32] = {0};
// sprintf(cid, "%"PRIi64, id);
// cJSON* cid_ = cJSON_CreateString(cid);
// cJSON_AddItemToObject(json, "id", cid_);
cJSON* tableType = cJSON_CreateString("child");
cJSON_AddItemToObject(json, "tableType", tableType);
buildChildElement(json, pCreateReq);
cJSON* createList = cJSON_CreateArray();
for(int i = 0; nReqs > 1 && i < nReqs; i++){
cJSON* create = cJSON_CreateObject();
buildChildElement(create, pCreateReq + i);
cJSON_AddItemToArray(createList, create);
}
cJSON_AddItemToObject(json, "createList", createList);
string = cJSON_PrintUnformatted(json);
cJSON_Delete(json);
return string;
}
static char* processCreateTable(SMqMetaRsp* metaRsp) {
......@@ -329,54 +352,47 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
}
// loop to create table
cJSON* tables = cJSON_CreateArray();
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if (req.nReqs > 0) {
pCreateReq = req.pReqs;
if (pCreateReq->type == TSDB_CHILD_TABLE) {
buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables);
string = buildCreateCTableJson(req.pReqs, req.nReqs);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables);
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
}
}
string = cJSON_PrintUnformatted(tables);
cJSON_Delete(tables);
_exit:
tDecoderClear(&decoder);
return string;
}
static char* processAutoCreateTable(STaosxRsp* rsp) {
SDecoder decoder = {0};
SVCreateTbReq* pCreateReq;
char* string = NULL;
if(rsp->createTableNum == 0) return NULL;
SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
char* string = NULL;
// loop to create table
cJSON* tables = cJSON_CreateArray();
for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
// decode
void** data = taosArrayGet(rsp->createTableReq, iReq);
int32_t *len = taosArrayGet(rsp->createTableLen, iReq);
tDecoderInit(&decoder, *data, *len);
if (tDecodeSVCreateTbReq(&decoder, pCreateReq) < 0) {
tDecoderClear(&decoder);
tDecoderInit(&decoder[iReq], *data, *len);
if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
goto _exit;
}
if (pCreateReq->type == TSDB_CHILD_TABLE) {
buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name,
pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum, tables);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, tables);
}
tDecoderClear(&decoder);
ASSERT(pCreateReq[iReq].type == TSDB_CHILD_TABLE);
}
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
string = cJSON_PrintUnformatted(tables);
_exit:
cJSON_Delete(tables);
for(int i = 0; i < rsp->createTableNum; i++){
tDecoderClear(&decoder[i]);
}
taosMemoryFree(decoder);
taosMemoryFree(pCreateReq);
return string;
}
......
......@@ -1470,11 +1470,11 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqTaosxRspObj));
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
if (!pWrapper->dataRsp.withSchema) {
if (!pWrapper->taosxRsp.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
}
......@@ -1786,6 +1786,9 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->topic, '.') + 1;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
} else {
return NULL;
}
......@@ -1798,6 +1801,9 @@ const char* tmq_get_db_name(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->db, '.') + 1;
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
} else {
return NULL;
}
......@@ -1810,6 +1816,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId;
} else if (TD_RES_TMQ_META(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
return pRspObj->vgId;
} else {
return -1;
}
......
......@@ -54,12 +54,12 @@ static void msg_process(TAOS_RES* msg) {
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
TAOS *pConn = use_db();
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) {
char* result = tmq_get_json_meta(msg);
if (result) {
printf("meta result: %s\n", result);
}
if(g_fp){
if(g_fp && result){
taosFprintfFile(g_fp, result);
taosFprintfFile(g_fp, "\n");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册