未验证 提交 30b1af3f 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #14700 from taosdata/feature/TD-13041

feat:write meta from tmq to taosd
...@@ -28,15 +28,23 @@ static void msg_process(TAOS_RES* msg) { ...@@ -28,15 +28,23 @@ static void msg_process(TAOS_RES* msg) {
printf("db: %s\n", tmq_get_db_name(msg)); printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg));
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
void* meta; tmq_raw_data *raw = tmq_get_raw_meta(msg);
int32_t metaLen; if(raw){
tmq_get_raw_meta(msg, &meta, &metaLen); TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", "abc1", 0);
if (pConn == NULL) {
return;
}
int32_t ret = taos_write_raw_meta(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
free(raw);
taos_close(pConn);
}
char* result = tmq_get_json_meta(msg); char* result = tmq_get_json_meta(msg);
if(result){ if(result){
printf("meta result: %s\n", result); printf("meta result: %s\n", result);
free(result); free(result);
} }
printf("meta, len is %d\n", metaLen); printf("meta:%p\n", raw);
return; return;
} }
while (1) { while (1) {
......
...@@ -260,15 +260,16 @@ enum tmq_res_t { ...@@ -260,15 +260,16 @@ enum tmq_res_t {
}; };
typedef enum tmq_res_t tmq_res_t; typedef enum tmq_res_t tmq_res_t;
typedef struct tmq_raw_data tmq_raw_data;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len); DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len); DLL_EXPORT tmq_raw_data *tmq_get_raw_meta(TAOS_RES *res);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed. DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed.
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
/* ------------------------------ TMQ END -------------------------------- */ /* ------------------------------ TMQ END -------------------------------- */
......
...@@ -446,11 +446,16 @@ typedef struct { ...@@ -446,11 +446,16 @@ typedef struct {
int32_t ast1Len; int32_t ast1Len;
int32_t ast2Len; int32_t ast2Len;
SArray* pColumns; // array of SField SArray* pColumns; // array of SField
int32_t cVersion;
SArray* pTags; // array of SField SArray* pTags; // array of SField
int32_t tVersion;
SArray* pFuncs; SArray* pFuncs;
char* pComment; char* pComment;
char* pAst1; char* pAst1;
char* pAst2; char* pAst2;
tb_uid_t suid;
int8_t source; // 1-taosX or 0-taosClient
int8_t reserved[8];
} SMCreateStbReq; } SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
...@@ -458,8 +463,11 @@ int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pR ...@@ -458,8 +463,11 @@ int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pR
void tFreeSMCreateStbReq(SMCreateStbReq* pReq); void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists; int8_t igNotExists;
tb_uid_t suid;
int8_t source; // 1-taosX or 0-taosClient
int8_t reserved[8];
} SMDropStbReq; } SMDropStbReq;
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
......
...@@ -37,6 +37,8 @@ typedef struct SName { ...@@ -37,6 +37,8 @@ typedef struct SName {
char tname[TSDB_TABLE_NAME_LEN]; char tname[TSDB_TABLE_NAME_LEN];
} SName; } SName;
SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName);
int32_t tNameExtractFullName(const SName* name, char* dst); int32_t tNameExtractFullName(const SName* name, char* dst);
int32_t tNameLen(const SName* name); int32_t tNameLen(const SName* name);
......
...@@ -96,6 +96,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols ...@@ -96,6 +96,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
char* tableName, char* msgBuf, int16_t msgBufLen); char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons ...@@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); } #define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); } #define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); }
// clang-format on // clang-format on
#define BUF_PAGE_DEBUG
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1570,41 +1570,40 @@ static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsV ...@@ -1570,41 +1570,40 @@ static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsV
return TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
} }
*tsVal = timeDouble;
size_t typeLen = strlen(type->valuestring); size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) { if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
// seconds // seconds
timeDouble = timeDouble * 1e9; *tsVal = *tsVal * NANOSECOND_PER_SEC;
timeDouble = timeDouble * NANOSECOND_PER_SEC;
if (smlDoubleToInt64OverFlow(timeDouble)) { if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
} }
*tsVal = timeDouble;
} else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) { } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
switch (type->valuestring[0]) { switch (type->valuestring[0]) {
case 'm': case 'm':
case 'M': case 'M':
// milliseconds // milliseconds
timeDouble = timeDouble * 1e6; *tsVal = *tsVal * NANOSECOND_PER_MSEC;
timeDouble = timeDouble * NANOSECOND_PER_MSEC;
if (smlDoubleToInt64OverFlow(timeDouble)) { if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
} }
*tsVal = timeDouble;
break; break;
case 'u': case 'u':
case 'U': case 'U':
// microseconds // microseconds
timeDouble = timeDouble * 1e3; *tsVal = *tsVal * NANOSECOND_PER_USEC;
timeDouble = timeDouble * NANOSECOND_PER_USEC;
if (smlDoubleToInt64OverFlow(timeDouble)) { if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
} }
*tsVal = timeDouble;
break; break;
case 'n': case 'n':
case 'N': case 'N':
// nanoseconds
*tsVal = timeDouble;
break; break;
default: default:
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
...@@ -1641,21 +1640,23 @@ static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) { ...@@ -1641,21 +1640,23 @@ static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
if (timeDouble < 0) { if (timeDouble < 0) {
return TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
} }
uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble); uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
tsVal = (int64_t)timeDouble;
if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) { if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
timeDouble = timeDouble * 1e9; tsVal = tsVal * NANOSECOND_PER_SEC;
timeDouble = timeDouble * NANOSECOND_PER_SEC;
if (smlDoubleToInt64OverFlow(timeDouble)) { if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
} }
tsVal = timeDouble;
} else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) { } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeDouble = timeDouble * 1e6; tsVal = tsVal * NANOSECOND_PER_MSEC;
timeDouble = timeDouble * NANOSECOND_PER_MSEC;
if (smlDoubleToInt64OverFlow(timeDouble)) { if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP; return TSDB_CODE_TSC_INVALID_TIME_STAMP;
} }
tsVal = timeDouble;
} else if (timeDouble == 0) { } else if (timeDouble == 0) {
tsVal = taosGetTimestampNs(); tsVal = taosGetTimestampNs();
} else { } else {
......
...@@ -106,6 +106,12 @@ struct tmq_t { ...@@ -106,6 +106,12 @@ struct tmq_t {
tsem_t rspSem; tsem_t rspSem;
}; };
struct tmq_raw_data{
void *raw_meta;
int32_t raw_meta_len;
int16_t raw_meta_type;
};
enum { enum {
TMQ_VG_STATUS__IDLE = 0, TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT, TMQ_VG_STATUS__WAIT,
...@@ -1842,14 +1848,16 @@ const char* tmq_get_table_name(TAOS_RES* res) { ...@@ -1842,14 +1848,16 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) { tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) {
if (TD_RES_TMQ_META(res)) { if (TD_RES_TMQ_META(res)) {
tmq_raw_data *raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
*raw_meta = pMetaRspObj->metaRsp.metaRsp; raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
*raw_meta_len = pMetaRspObj->metaRsp.metaRspLen; raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
return 0; raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
return raw;
} }
return -1; return NULL;
} }
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
...@@ -2252,6 +2260,515 @@ char *tmq_get_json_meta(TAOS_RES *res){ ...@@ -2252,6 +2260,515 @@ char *tmq_get_json_meta(TAOS_RES *res){
return NULL; return NULL;
} }
static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
SVCreateStbReq req = {0};
SDecoder coder;
SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
// build create stable
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
for(int32_t i = 0; i < req.schemaRow.nCols; i++){
SSchema* pSchema = req.schemaRow.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pColumns, &field);
}
pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
for(int32_t i = 0; i < req.schemaTag.nCols; i++){
SSchema* pSchema = req.schemaTag.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pTags, &field);
}
pReq.cVersion = req.schemaRow.version;
pReq.tVersion = req.schemaTag.version;
pReq.numOfColumns = req.schemaRow.nCols;
pReq.numOfTags = req.schemaTag.nCols;
pReq.commentLen = -1;
pReq.suid = req.suid;
pReq.source = 1;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
SCmdMsgInfo pCmdMsg = {0};
pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
pCmdMsg.msgType = TDMT_MND_CREATE_STB;
pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
if (NULL == pCmdMsg.pMsg) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true;
launchQueryImpl(pRequest, &pQuery, true, NULL);
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder);
return code;
}
static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
SVDropStbReq req = {0};
SDecoder coder;
SMDropStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
// build drop stable
pReq.igNotExists = true;
pReq.source = 1;
pReq.suid = req.suid;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
SCmdMsgInfo pCmdMsg = {0};
pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
pCmdMsg.msgType = TDMT_MND_DROP_STB;
pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
if (NULL == pCmdMsg.pMsg) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
SQuery pQuery = {0};
pQuery.execMode = QUERY_EXEC_MODE_RPC;
pQuery.pCmdMsg = &pCmdMsg;
pQuery.msgType = pQuery.pCmdMsg->msgType;
pQuery.stableQuery = true;
launchQueryImpl(pRequest, &pQuery, true, NULL);
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
destroyRequest(pRequest);
tDecoderClear(&coder);
return code;
}
typedef struct SVgroupCreateTableBatch {
SVCreateTbBatchReq req;
SVgroupInfo info;
char dbName[TSDB_DB_NAME_LEN];
} SVgroupCreateTableBatch;
static void destroyCreateTbReqBatch(void* data) {
SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*) data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
SVCreateTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
SVCreateTbReq *pCreateReq = NULL;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pVgroupHashmap) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0};
SName pName;
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
if (pTableBatch == NULL) {
SVgroupCreateTableBatch tBatch = {0};
tBatch.info = pInfo;
strcpy(tBatch.dbName, pRequest->pDb);
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, pCreateReq);
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
taosArrayPush(pTableBatch->req.pArray, pCreateReq);
}
}
SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
if (NULL == pBufArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_CREATE_TABLE;
pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end
code = pRequest->code;
end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
qDestroyQuery(pQuery);
return code;
}
typedef struct SVgroupDropTableBatch {
SVDropTbBatchReq req;
SVgroupInfo info;
char dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;
static void destroyDropTbReqBatch(void* data) {
SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
SVDropTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
SVDropTbReq *pDropReq = NULL;
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pVgroupHashmap) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
SVgroupInfo pInfo = {0};
SName pName;
toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
if (pTableBatch == NULL) {
SVgroupDropTableBatch tBatch = {0};
tBatch.info = pInfo;
tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
taosArrayPush(tBatch.req.pArray, pDropReq);
taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
taosArrayPush(pTableBatch->req.pArray, pDropReq);
}
}
SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
if (NULL == pBufArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DROP_TABLE;
pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end
code = pRequest->code;
end:
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
qDestroyQuery(pQuery);
return code;
}
static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
SVAlterTbReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SArray *pArray = NULL;
SVgDataBlocks *pVgData = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
if(!pRequest->pDb){
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
goto end;
}
// do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
if(req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS){
goto end;
}
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0};
SName pName = {0};
toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
pArray = taosArrayInit(1, sizeof(void*));
if (NULL == pArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pVgData->vg = pInfo;
pVgData->pData = taosMemoryMalloc(metaLen);
if (NULL == pVgData->pData) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
memcpy(pVgData->pData, meta, metaLen);
((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId);
pVgData->size = metaLen;
pVgData->numOfTables = 1;
taosArrayPush(pArray, &pVgData);
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_ALTER_TABLE;
pQuery->stableQuery = false;
pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
code = rewriteToVnodeModifyOpStmt(pQuery, pArray);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
launchQueryImpl(pRequest, pQuery, false, NULL);
pQuery = NULL; // no need to free in the end
pVgData = NULL;
pArray = NULL;
code = pRequest->code;
end:
taosArrayDestroy(pArray);
if(pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData);
destroyRequest(pRequest);
tDecoderClear(&coder);
qDestroyQuery(pQuery);
return code;
}
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){
if (!taos || !raw_meta) {
return TSDB_CODE_INVALID_PARA;
}
if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){
return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE){
return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE){
return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
}
return TSDB_CODE_INVALID_PARA;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner2(tmq, msg, 0, 1, cb, param); tmqCommitInner2(tmq, msg, 0, 1, cb, param);
} }
......
...@@ -1284,4 +1284,210 @@ TEST(testCase, sml_dup_time_Test) { ...@@ -1284,4 +1284,210 @@ TEST(testCase, sml_dup_time_Test) {
ASSERT_EQ(taos_errno(pRes), 0); ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes); taos_free_result(pRes);
} }
*/
TEST(testCase, sml_16960_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists d16368 schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use d16368");
taos_free_result(pRes);
const char *sql[] = {
"[\n"
"{\n"
"\"timestamp\":\n"
"\n"
"{ \"value\": 1349020800000, \"type\": \"ms\" }\n"
",\n"
"\"value\":\n"
"\n"
"{ \"value\": 830525384, \"type\": \"int\" }\n"
",\n"
"\"tags\": {\n"
"\"id\": \"stb00_0\",\n"
"\"t0\":\n"
"\n"
"{ \"value\": 83972721, \"type\": \"int\" }\n"
",\n"
"\"t1\":\n"
"\n"
"{ \"value\": 539147525, \"type\": \"int\" }\n"
",\n"
"\"t2\":\n"
"\n"
"{ \"value\": 618258572, \"type\": \"int\" }\n"
",\n"
"\"t3\":\n"
"\n"
"{ \"value\": -10536201, \"type\": \"int\" }\n"
",\n"
"\"t4\":\n"
"\n"
"{ \"value\": 349227409, \"type\": \"int\" }\n"
",\n"
"\"t5\":\n"
"\n"
"{ \"value\": 249347042, \"type\": \"int\" }\n"
"},\n"
"\"metric\": \"stb0\"\n"
"},\n"
"{\n"
"\"timestamp\":\n"
"\n"
"{ \"value\": 1349020800001, \"type\": \"ms\" }\n"
",\n"
"\"value\":\n"
"\n"
"{ \"value\": -588348364, \"type\": \"int\" }\n"
",\n"
"\"tags\": {\n"
"\"id\": \"stb00_0\",\n"
"\"t0\":\n"
"\n"
"{ \"value\": 83972721, \"type\": \"int\" }\n"
",\n"
"\"t1\":\n"
"\n"
"{ \"value\": 539147525, \"type\": \"int\" }\n"
",\n"
"\"t2\":\n"
"\n"
"{ \"value\": 618258572, \"type\": \"int\" }\n"
",\n"
"\"t3\":\n"
"\n"
"{ \"value\": -10536201, \"type\": \"int\" }\n"
",\n"
"\"t4\":\n"
"\n"
"{ \"value\": 349227409, \"type\": \"int\" }\n"
",\n"
"\"t5\":\n"
"\n"
"{ \"value\": 249347042, \"type\": \"int\" }\n"
"},\n"
"\"metric\": \"stb0\"\n"
"},\n"
"{\n"
"\"timestamp\":\n"
"\n"
"{ \"value\": 1349020800002, \"type\": \"ms\" }\n"
",\n"
"\"value\":\n"
"\n"
"{ \"value\": -370310823, \"type\": \"int\" }\n"
",\n"
"\"tags\": {\n"
"\"id\": \"stb00_0\",\n"
"\"t0\":\n"
"\n"
"{ \"value\": 83972721, \"type\": \"int\" }\n"
",\n"
"\"t1\":\n"
"\n"
"{ \"value\": 539147525, \"type\": \"int\" }\n"
",\n"
"\"t2\":\n"
"\n"
"{ \"value\": 618258572, \"type\": \"int\" }\n"
",\n"
"\"t3\":\n"
"\n"
"{ \"value\": -10536201, \"type\": \"int\" }\n"
",\n"
"\"t4\":\n"
"\n"
"{ \"value\": 349227409, \"type\": \"int\" }\n"
",\n"
"\"t5\":\n"
"\n"
"{ \"value\": 249347042, \"type\": \"int\" }\n"
"},\n"
"\"metric\": \"stb0\"\n"
"},\n"
"{\n"
"\"timestamp\":\n"
"\n"
"{ \"value\": 1349020800003, \"type\": \"ms\" }\n"
",\n"
"\"value\":\n"
"\n"
"{ \"value\": -811250191, \"type\": \"int\" }\n"
",\n"
"\"tags\": {\n"
"\"id\": \"stb00_0\",\n"
"\"t0\":\n"
"\n"
"{ \"value\": 83972721, \"type\": \"int\" }\n"
",\n"
"\"t1\":\n"
"\n"
"{ \"value\": 539147525, \"type\": \"int\" }\n"
",\n"
"\"t2\":\n"
"\n"
"{ \"value\": 618258572, \"type\": \"int\" }\n"
",\n"
"\"t3\":\n"
"\n"
"{ \"value\": -10536201, \"type\": \"int\" }\n"
",\n"
"\"t4\":\n"
"\n"
"{ \"value\": 349227409, \"type\": \"int\" }\n"
",\n"
"\"t5\":\n"
"\n"
"{ \"value\": 249347042, \"type\": \"int\" }\n"
"},\n"
"\"metric\": \"stb0\"\n"
"},\n"
"{\n"
"\"timestamp\":\n"
"\n"
"{ \"value\": 1349020800004, \"type\": \"ms\" }\n"
",\n"
"\"value\":\n"
"\n"
"{ \"value\": -330340558, \"type\": \"int\" }\n"
",\n"
"\"tags\": {\n"
"\"id\": \"stb00_0\",\n"
"\"t0\":\n"
"\n"
"{ \"value\": 83972721, \"type\": \"int\" }\n"
",\n"
"\"t1\":\n"
"\n"
"{ \"value\": 539147525, \"type\": \"int\" }\n"
",\n"
"\"t2\":\n"
"\n"
"{ \"value\": 618258572, \"type\": \"int\" }\n"
",\n"
"\"t3\":\n"
"\n"
"{ \"value\": -10536201, \"type\": \"int\" }\n"
",\n"
"\"t4\":\n"
"\n"
"{ \"value\": 349227409, \"type\": \"int\" }\n"
",\n"
"\"t5\":\n"
"\n"
"{ \"value\": 249347042, \"type\": \"int\" }\n"
"},\n"
"\"metric\": \"stb0\"\n"
"}\n"
"]"
};
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
*/
\ No newline at end of file
...@@ -516,6 +516,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -516,6 +516,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
if (tEncodeI32(&encoder, pReq->cVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfTags; ++i) { for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i); SField *pField = taosArrayGet(pReq->pTags, i);
if (tEncodeI8(&encoder, pField->type) < 0) return -1; if (tEncodeI8(&encoder, pField->type) < 0) return -1;
...@@ -524,6 +526,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -524,6 +526,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
if (tEncodeI32(&encoder, pReq->tVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) { for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
const char *pFunc = taosArrayGet(pReq->pFuncs, i); const char *pFunc = taosArrayGet(pReq->pFuncs, i);
if (tEncodeCStr(&encoder, pFunc) < 0) return -1; if (tEncodeCStr(&encoder, pFunc) < 0) return -1;
...@@ -538,6 +542,11 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -538,6 +542,11 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (pReq->ast2Len > 0) { if (pReq->ast2Len > 0) {
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1; if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -585,6 +594,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -585,6 +594,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
} }
} }
if (tDecodeI32(&decoder, &pReq->cVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfTags; ++i) { for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0}; SField field = {0};
if (tDecodeI8(&decoder, &field.type) < 0) return -1; if (tDecodeI8(&decoder, &field.type) < 0) return -1;
...@@ -597,6 +608,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -597,6 +608,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
} }
} }
if (tDecodeI32(&decoder, &pReq->tVersion) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) { for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
char pFunc[TSDB_FUNC_NAME_LEN] = {0}; char pFunc[TSDB_FUNC_NAME_LEN] = {0};
if (tDecodeCStrTo(&decoder, pFunc) < 0) return -1; if (tDecodeCStrTo(&decoder, pFunc) < 0) return -1;
...@@ -624,6 +637,12 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -624,6 +637,12 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
} }
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
...@@ -645,6 +664,11 @@ int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) { ...@@ -645,6 +664,11 @@ int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
if (tEncodeI8(&encoder, pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -659,6 +683,12 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) ...@@ -659,6 +683,12 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->source) < 0) return -1;
for (int32_t i = 0; i < sizeof(pReq->reserved)/sizeof(int8_t); ++i) {
if (tDecodeI8(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
......
...@@ -115,6 +115,14 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in ...@@ -115,6 +115,14 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
#endif #endif
SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
strcpy(pName->dbname, pDbName);
strcpy(pName->tname, pTableName);
return pName;
}
int32_t tNameExtractFullName(const SName* name, char* dst) { int32_t tNameExtractFullName(const SName* name, char* dst) {
assert(name != NULL && dst != NULL); assert(name != NULL && dst != NULL);
......
...@@ -705,10 +705,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat ...@@ -705,10 +705,10 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN); memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
pDst->createdTime = taosGetTimestampMs(); pDst->createdTime = taosGetTimestampMs();
pDst->updateTime = pDst->createdTime; pDst->updateTime = pDst->createdTime;
pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->uid = (pCreate->source == 1) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->dbUid = pDb->uid; pDst->dbUid = pDb->uid;
pDst->tagVer = 1; pDst->tagVer = (pCreate->source == 1) ? pCreate->tVersion : 1;
pDst->colVer = 1; pDst->colVer = (pCreate->source == 1) ? pCreate->cVersion : 1;
pDst->smaVer = 1; pDst->smaVer = 1;
pDst->nextColId = 1; pDst->nextColId = 1;
pDst->maxdelay[0] = pCreate->delay1; pDst->maxdelay[0] = pCreate->delay1;
...@@ -1752,6 +1752,11 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { ...@@ -1752,6 +1752,11 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
} }
} }
if (dropReq.source == 1 && pStb->uid != dropReq.suid){
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
goto _OVER;
}
pDb = mndAcquireDbByStb(pMnode, dropReq.name); pDb = mndAcquireDbByStb(pMnode, dropReq.name);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
......
...@@ -3467,6 +3467,10 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -3467,6 +3467,10 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage); releaseBufPage(pCtx->pBuf, pPage);
#ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId,
pPos->offset);
#endif
} }
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
...@@ -3501,6 +3505,9 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -3501,6 +3505,9 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage); releaseBufPage(pCtx->pBuf, pPage);
#ifdef BUF_PAGE_DEBUG
qDebug("page_copyTuple pos:%p, pageId:%d, offset:%d", pPos, pPos->pageId, pPos->offset);
#endif
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
......
...@@ -77,14 +77,6 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) { ...@@ -77,14 +77,6 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
pName->type = TSDB_TABLE_NAME_T;
pName->acctId = acctId;
strcpy(pName->dbname, pDbName);
strcpy(pName->tname, pTableName);
return pName;
}
static int32_t collectUseDatabaseImpl(const char* pFullDbName, SHashObj* pDbs) { static int32_t collectUseDatabaseImpl(const char* pFullDbName, SHashObj* pDbs) {
SFullDatabaseName name = {0}; SFullDatabaseName name = {0};
strcpy(name.fullDbName, pFullDbName); strcpy(name.fullDbName, pFullDbName);
...@@ -5370,7 +5362,8 @@ static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch ...@@ -5370,7 +5362,8 @@ static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) { static void destroyCreateTbReqBatch(void* data) {
SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*) data;
size_t size = taosArrayGetSize(pTbBatch->req.pArray); size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i); SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
...@@ -5387,7 +5380,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) { ...@@ -5387,7 +5380,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
taosArrayDestroy(pTbBatch->req.pArray); taosArrayDestroy(pTbBatch->req.pArray);
} }
static int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray) { int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray) {
SVnodeModifOpStmt* pNewStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); SVnodeModifOpStmt* pNewStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (pNewStmt == NULL) { if (pNewStmt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -5453,10 +5446,10 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5453,10 +5446,10 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt, static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt,
const STag* pTag, uint64_t suid, SVgroupInfo* pVgInfo) { const STag* pTag, uint64_t suid, SVgroupInfo* pVgInfo) {
char dbFName[TSDB_DB_FNAME_LEN] = {0}; // char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId}; // SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
strcpy(name.dbname, pStmt->dbName); // strcpy(name.dbname, pStmt->dbName);
tNameGetFullDbName(&name, dbFName); // tNameGetFullDbName(&name, dbFName);
struct SVCreateTbReq req = {0}; struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE; req.type = TD_CHILD_TABLE;
...@@ -5717,7 +5710,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla ...@@ -5717,7 +5710,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
return code; return code;
} }
static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) { SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
if (NULL == pBufArray) { if (NULL == pBufArray) {
return NULL; return NULL;
...@@ -5732,7 +5725,6 @@ static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgrou ...@@ -5732,7 +5725,6 @@ static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgrou
} }
serializeVgroupCreateTableBatch(pTbBatch, pBufArray); serializeVgroupCreateTableBatch(pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch);
} while (true); } while (true);
return pBufArray; return pBufArray;
...@@ -5746,6 +5738,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) ...@@ -5746,6 +5738,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode; SNode* pNode;
FOREACH(pNode, pStmt->pSubTables) { FOREACH(pNode, pStmt->pSubTables) {
...@@ -5757,7 +5750,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) ...@@ -5757,7 +5750,7 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
} }
} }
SArray* pBufArray = serializeVgroupsCreateTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap); SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
if (NULL == pBufArray) { if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -5817,7 +5810,10 @@ over: ...@@ -5817,7 +5810,10 @@ over:
return code; return code;
} }
static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); } static void destroyDropTbReqBatch(void* data) {
SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) { static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) {
int tlen; int tlen;
...@@ -5851,7 +5847,7 @@ static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SA ...@@ -5851,7 +5847,7 @@ static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SA
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) { SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
if (NULL == pBufArray) { if (NULL == pBufArray) {
return NULL; return NULL;
...@@ -5866,7 +5862,6 @@ static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupH ...@@ -5866,7 +5862,6 @@ static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupH
} }
serializeVgroupDropTableBatch(pTbBatch, pBufArray); serializeVgroupDropTableBatch(pTbBatch, pBufArray);
destroyDropTbReqBatch(pTbBatch);
} while (true); } while (true);
return pBufArray; return pBufArray;
...@@ -5880,6 +5875,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5880,6 +5875,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
bool isSuperTable = false; bool isSuperTable = false;
SNode* pNode; SNode* pNode;
FOREACH(pNode, pStmt->pTables) { FOREACH(pNode, pStmt->pTables) {
...@@ -5898,7 +5894,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5898,7 +5894,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* pBufArray = serializeVgroupsDropTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap); SArray* pBufArray = serializeVgroupsDropTableBatch(pVgroupHashmap);
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
if (NULL == pBufArray) { if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册