提交 6b46423a 编写于 作者: wmmhello's avatar wmmhello

feat:write meta from tmq to taosd

上级 aae8adeb
......@@ -28,15 +28,23 @@ static void msg_process(TAOS_RES* msg) {
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
void* meta;
int32_t metaLen;
tmq_get_raw_meta(msg, &meta, &metaLen);
tmq_raw_data *raw = tmq_get_raw_meta(msg);
if(raw){
TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0);
if (pConn == NULL) {
return;
}
int32_t ret = taos_write_raw_meta(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
free(raw);
taos_close(pConn);
}
char* result = tmq_get_json_meta(msg);
if(result){
printf("meta result: %s\n", result);
free(result);
}
printf("meta, len is %d\n", metaLen);
printf("meta:%p\n", raw);
return;
}
while (1) {
......@@ -60,7 +68,7 @@ int32_t init_env() {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 3");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -260,15 +260,16 @@ enum tmq_res_t {
};
typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len);
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed.
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
typedef struct tmq_raw_data tmq_raw_data;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT tmq_raw_data *tmq_get_raw_meta(TAOS_RES *res);
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed.
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
/* ------------------------------ TMQ END -------------------------------- */
......
......@@ -446,11 +446,16 @@ typedef struct {
int32_t ast1Len;
int32_t ast2Len;
SArray* pColumns; // array of SField
int32_t cVersion;
SArray* pTags; // array of SField
int32_t tVersion;
SArray* pFuncs;
char* pComment;
char* pAst1;
char* pAst2;
tb_uid_t suid;
int8_t source; // 1-taosX or 0-taosClient
int8_t reserved[8];
} SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
......@@ -458,8 +463,11 @@ int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pR
void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
tb_uid_t suid;
int8_t source; // 1-taosX or 0-taosClient
int8_t reserved[8];
} SMDropStbReq;
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
......
......@@ -37,6 +37,8 @@ typedef struct SName {
char tname[TSDB_TABLE_NAME_LEN];
} SName;
SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName);
int32_t tNameExtractFullName(const SName* name, char* dst);
int32_t tNameLen(const SName* name);
......
......@@ -96,6 +96,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap);
#ifdef __cplusplus
}
#endif
......
......@@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); }
// clang-format on
#define BUF_PAGE_DEBUG
#ifdef __cplusplus
}
#endif
......
......@@ -107,6 +107,12 @@ struct tmq_t {
tsem_t rspSem;
};
struct tmq_raw_data{
void *raw_meta;
int32_t raw_meta_len;
int16_t raw_meta_type;
};
enum {
TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT,
......@@ -1840,14 +1846,16 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL;
}
int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) {
tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) {
if (TD_RES_TMQ_META(res)) {
tmq_raw_data *raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
*raw_meta = pMetaRspObj->metaRsp.metaRsp;
*raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
return 0;
raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
return raw;
}
return -1;
return NULL;
}
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
......@@ -2250,6 +2258,515 @@ char *tmq_get_json_meta(TAOS_RES *res){
return NULL;
}
static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
SVCreateStbReq req = {0};
SDecoder coder;
SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
// build create stable
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
for(int32_t i = 0; i < req.schemaRow.nCols; i++){
SSchema* pSchema = req.schemaRow.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pColumns, &field);
}
pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
for(int32_t i = 0; i < req.schemaTag.nCols; i++){
SSchema* pSchema = req.schemaTag.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pTags, &field);
}
pReq.cVersion = req.schemaRow.version;
pReq.tVersion = req.schemaTag.version;
pReq.numOfColumns = req.schemaRow.nCols;
pReq.numOfTags = req.schemaTag.nCols;
pReq.commentLen = -1;
pReq.suid = req.suid;
pReq.source = 1;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, "rname", &tableName), pReq.name);
SCmdMsgInfo pCmdMsg = {0};
pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
pCmdMsg.msgType = TDMT_MND_CREATE_STB;
pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
if (NULL == pCmdMsg.pMsg) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true;
launchQueryImpl(pRequest, &pQuery, true, NULL);
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder);
return code;
}
static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
SVDropStbReq req = {0};
SDecoder coder;
SMDropStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
// build drop stable
pReq.igNotExists = true;
pReq.source = 1;
pReq.suid = req.suid;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, "rname", &tableName), pReq.name);
SCmdMsgInfo pCmdMsg = {0};
pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
pCmdMsg.msgType = TDMT_MND_DROP_STB;
pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
if (NULL == pCmdMsg.pMsg) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true;
launchQueryImpl(pRequest, &pQuery, true, NULL);
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
destroyRequest(pRequest);
tDecoderClear(&coder);
return code;
}
typedef struct SVgroupCreateTableBatch {
SVCreateTbBatchReq req;
SVgroupInfo info;
char dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;
static void destroyCreateTbReqBatch(void* data) {
SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*) data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
SVCreateTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
SVCreateTbReq *pCreateReq = NULL;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pVgroupHashmap) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0};
SName pName;
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
if (pTableBatch == NULL) {
SVgroupCreateTableBatch tBatch = {0};
tBatch.info = pInfo;
strcpy(tBatch.dbName, pRequest->pDb);
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, pCreateReq);
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
taosArrayPush(pTableBatch->req.pArray, pCreateReq);
}
}
SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
if (NULL == pBufArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_CREATE_TABLE;
pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end
code = pRequest->code;
end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
qDestroyQuery(pQuery);
return code;
}
typedef struct SVgroupDropTableBatch {
SVDropTbBatchReq req;
SVgroupInfo info;
char dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;
static void destroyDropTbReqBatch(void* data) {
SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
SVDropTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
SVDropTbReq *pDropReq = NULL;
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pVgroupHashmap) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0};
SName pName;
toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
if (pTableBatch == NULL) {
SVgroupDropTableBatch tBatch = {0};
tBatch.info = pInfo;
tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
taosArrayPush(tBatch.req.pArray, pDropReq);
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
taosArrayPush(pTableBatch->req.pArray, pDropReq);
}
}
SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
if (NULL == pBufArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DROP_TABLE;
pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end
code = pRequest->code;
end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
qDestroyQuery(pQuery);
return code;
}
static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
SVAlterTbReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SArray *pArray = NULL;
SVgDataBlocks *pVgData = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
// do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
if(req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS){
goto end;
}
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0};
SName pName = {0};
toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pArray = taosArrayInit(1, sizeof(void*));
if (NULL == pArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pVgData->vg = pInfo;
pVgData->pData = taosMemoryMalloc(metaLen);
if (NULL == pVgData->pData) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
memcpy(pVgData->pData, meta, metaLen);
((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
pVgData->size = metaLen;
pVgData->numOfTables = 1;
taosArrayPush(pArray, &pVgData);
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_ALTER_TABLE;
pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end
pVgData = NULL;
pArray = NULL;
code = pRequest->code;
end:
taosArrayDestroy(pArray);
if(pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData);
destroyRequest(pRequest);
tDecoderClear(&coder);
qDestroyQuery(pQuery);
return code;
}
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){
if (!taos || !raw_meta) {
return TSDB_CODE_INVALID_PARA;
}
if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){
return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE){
return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE){
return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}
return TSDB_CODE_INVALID_PARA;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
}
......
......@@ -516,6 +516,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->cVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i);
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
......@@ -524,6 +526,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->tVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
const char *pFunc = taosArrayGet(pReq->pFuncs, i);
if (tEncodeCStr(&encoder, pFunc) < 0) return -1;
......@@ -538,6 +542,11 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (pReq->ast2Len > 0) {
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
......@@ -585,6 +594,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
}
}
if (tDecodeI32(&decoder, &pReq->cVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0};
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
......@@ -597,6 +608,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
}
}
if (tDecodeI32(&decoder, &pReq->tVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
char pFunc[TSDB_FUNC_NAME_LEN] = {0};
if (tDecodeCStrTo(&decoder, pFunc) < 0) return -1;
......@@ -624,6 +637,12 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
}
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......@@ -645,6 +664,11 @@ int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -659,6 +683,12 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -115,6 +115,14 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
#endif
SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
strcpy(pName->dbname, pDbName);
strcpy(pName->tname, pTableName);
return pName;
}
int32_t tNameExtractFullName(const SName* name, char* dst) {
assert(name != NULL && dst != NULL);
......
......@@ -706,10 +706,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
pDst->createdTime = taosGetTimestampMs();
pDst->updateTime = pDst->createdTime;
pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->uid = (pCreate->source == 1) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->dbUid = pDb->uid;
pDst->tagVer = 1;
pDst->colVer = 1;
pDst->tagVer = (pCreate->source == 1) ? pCreate->tVersion : 1;
pDst->colVer = (pCreate->source == 1) ? pCreate->cVersion : 1;
pDst->smaVer = 1;
pDst->nextColId = 1;
pDst->maxdelay[0] = pCreate->delay1;
......@@ -1752,6 +1752,11 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
}
}
if (dropReq.source == 1 && pStb->uid != dropReq.suid){
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
goto _OVER;
}
pDb = mndAcquireDbByStb(pMnode, dropReq.name);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
......
......@@ -3451,6 +3451,10 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage);
#ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId,
pPos->offset);
#endif
}
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
......@@ -3485,6 +3489,9 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage);
#ifdef BUF_PAGE_DEBUG
qDebug("page_copyTuple pos:%p, pageId:%d, offset:%d", pPos, pPos->pageId, pPos->offset);
#endif
}
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
......
......@@ -77,14 +77,6 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
return TSDB_CODE_SUCCESS;
}
static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
strcpy(pName->dbname, pDbName);
strcpy(pName->tname, pTableName);
return pName;
}
static int32_t collectUseDatabaseImpl(const char* pFullDbName, SHashObj* pDbs) {
SFullDatabaseName name = {0};
strcpy(name.fullDbName, pFullDbName);
......@@ -5294,7 +5286,8 @@ static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch
return TSDB_CODE_SUCCESS;
}
static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
static void destroyCreateTbReqBatch(void* data) {
SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*) data;
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for (int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
......@@ -5311,7 +5304,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray) {
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray) {
SVnodeModifOpStmt* pNewStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (pNewStmt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -5377,10 +5370,10 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt,
const STag* pTag, uint64_t suid, SVgroupInfo* pVgInfo) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
strcpy(name.dbname, pStmt->dbName);
tNameGetFullDbName(&name, dbFName);
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
// strcpy(name.dbname, pStmt->dbName);
// tNameGetFullDbName(&name, dbFName);
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
......@@ -5641,7 +5634,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
return code;
}
static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
if (NULL == pBufArray) {
return NULL;
......@@ -5656,7 +5649,6 @@ static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgrou
}
serializeVgroupCreateTableBatch(pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch);
} while (true);
return pBufArray;
......@@ -5670,6 +5662,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode;
FOREACH(pNode, pStmt->pSubTables) {
......@@ -5681,7 +5674,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
}
}
SArray* pBufArray = serializeVgroupsCreateTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
taosHashCleanup(pVgroupHashmap);
if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -5741,7 +5734,10 @@ over:
return code;
}
static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); }
static void destroyDropTbReqBatch(void* data) {
SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) {
int tlen;
......@@ -5775,7 +5771,7 @@ static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SA
return TSDB_CODE_SUCCESS;
}
static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
if (NULL == pBufArray) {
return NULL;
......@@ -5790,7 +5786,6 @@ static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupH
}
serializeVgroupDropTableBatch(pTbBatch, pBufArray);
destroyDropTbReqBatch(pTbBatch);
} while (true);
return pBufArray;
......@@ -5804,6 +5799,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
bool isSuperTable = false;
SNode* pNode;
FOREACH(pNode, pStmt->pTables) {
......@@ -5822,7 +5818,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_SUCCESS;
}
SArray* pBufArray = serializeVgroupsDropTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
taosHashCleanup(pVgroupHashmap);
if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册