提交 bddda543 编写于 作者: S slguan

submit message

上级 55a9a5db
...@@ -294,7 +294,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, ...@@ -294,7 +294,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
* There is not response callback function for submit response. * There is not response callback function for submit response.
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
if (type == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) { if (type == TSDB_MSG_TYPE_SUBMIT_RSP) {
pRes->numOfRows += *(int32_t *)pRes->pRsp; pRes->numOfRows += *(int32_t *)pRes->pRsp;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
...@@ -512,8 +512,6 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -512,8 +512,6 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code; return pSql->res.code;
} }
//TODO change the connect info in metadata
return TSDB_CODE_OTHERS;
// if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
// pSql->index = pMeterMetaInfo->pMeterMeta->index; // pSql->index = pMeterMetaInfo->pMeterMeta->index;
// } else { // it must be the parent SSqlObj for super table query // } else { // it must be the parent SSqlObj for super table query
...@@ -1240,13 +1238,13 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1240,13 +1238,13 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1); pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
//pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); pShellMsg->vnode = 0; //htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
//tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
// htons(pShellMsg->vnode)); htons(pShellMsg->vnode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1642,10 +1640,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1642,10 +1640,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
memmove(pSql->cmd.payload, pStart, pSql->cmd.payloadLen - tsRpcHeadSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3337,9 +3337,9 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -3337,9 +3337,9 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
} }
void tscInitMsgs() { void tscInitMsgs() {
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;// tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;// tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;// tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
......
...@@ -853,7 +853,8 @@ char *taos_errstr(TAOS *taos) { ...@@ -853,7 +853,8 @@ char *taos_errstr(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
uint8_t code; uint8_t code;
if (pObj == NULL || pObj->signature != pObj) return tstrerror(globalCode); if (pObj == NULL || pObj->signature != pObj)
return (char*)tstrerror(globalCode);
SSqlObj* pSql = pObj->pSql; SSqlObj* pSql = pObj->pSql;
...@@ -866,7 +867,7 @@ char *taos_errstr(TAOS *taos) { ...@@ -866,7 +867,7 @@ char *taos_errstr(TAOS *taos) {
if (hasAdditionalErrorInfo(code, &pSql->cmd)) { if (hasAdditionalErrorInfo(code, &pSql->cmd)) {
return pSql->cmd.payload; return pSql->cmd.payload;
} else { } else {
return tstrerror(code); return (char*)tstrerror(code);
} }
} }
......
...@@ -118,7 +118,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) { ...@@ -118,7 +118,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) && return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) &&
(pCmd->msgType == TSDB_MSG_TYPE_DNODE_QUERY); (pCmd->msgType == TSDB_MSG_TYPE_QUERY);
} }
bool tscQueryMetricTags(SQueryInfo* pQueryInfo) { bool tscQueryMetricTags(SQueryInfo* pQueryInfo) {
......
...@@ -30,8 +30,8 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void ...@@ -30,8 +30,8 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen);
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
void dnodeSendVpeerCfgMsg(int32_t vnode); void dnodeSendVnodeCfgMsg(int32_t vnode);
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid); void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid);
......
...@@ -49,12 +49,12 @@ int32_t dnodeDropTable(SDRemoveTableMsg *pTable); ...@@ -49,12 +49,12 @@ int32_t dnodeDropTable(SDRemoveTableMsg *pTable);
* Create stream * Create stream
* if stream already exist, update it * if stream already exist, update it
*/ */
int32_t dnodeCreateStream(SAlterStreamMsg *stream); int32_t dnodeCreateStream(SDAlterStreamMsg *pStream);
/* /*
* Remove all child tables of supertable from local repository * Remove all child tables of supertable from local repository
*/ */
int32_t dnodeDropSuperTable(uint64_t stableUid); int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -121,23 +121,6 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void ...@@ -121,23 +121,6 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void
//rpcFreeCont(pCont); //rpcFreeCont(pCont);
} }
void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
dnodeCreateTable(table);
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
pTable->sid = htonl(pTable->sid);
pTable->uid = htobe64(pTable->uid);
dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid);
dnodeDropTable(pTable);
} else {
dError("code:%d invalid message", code);
}
}
void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDCreateTableMsg *pTable = pCont; SDCreateTableMsg *pTable = pCont;
pTable->numOfColumns = htons(pTable->numOfColumns); pTable->numOfColumns = htons(pTable->numOfColumns);
...@@ -170,8 +153,14 @@ void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType ...@@ -170,8 +153,14 @@ void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType
} }
void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont; SDAlterStreamMsg *pStream = pCont;
int32_t code = dnodeCreateStream(stream); pStream->uid = htobe64(pStream->uid);
pStream->stime = htobe64(pStream->stime);
pStream->vnode = htonl(pStream->vnode);
pStream->sid = htonl(pStream->sid);
pStream->status = htonl(pStream->status);
int32_t code = dnodeCreateStream(pStream);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
...@@ -206,8 +195,26 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void ...@@ -206,8 +195,26 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void
} }
} }
void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
dnodeCreateTable(table);
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
pTable->sid = htonl(pTable->sid);
pTable->uid = htobe64(pTable->uid);
dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid);
dnodeDropTable(pTable);
} else {
dError("code:%d invalid message", code);
}
}
void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont; SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont;
int32_t code = dnodeCreateVnode(pVnode); int32_t code = dnodeCreateVnode(pVnode);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
...@@ -215,17 +222,19 @@ void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType ...@@ -215,17 +222,19 @@ void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType
void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont; SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(pVnode->vnode); int32_t vnode = htonl(pVnode->vnode);
int32_t code = dnodeDropVnode(vnode); int32_t code = dnodeDropVnode(vnode);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
int32_t code = tsCfgDynamicOptions(pCfg->config); int32_t code = tsCfgDynamicOptions(pCfg->config);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
} }
void dnodeSendVpeerCfgMsg(int32_t vnode) { void dnodeSendVnodeCfgMsg(int32_t vnode) {
SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg)); SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg));
if (cfg == NULL) { if (cfg == NULL) {
return; return;
...@@ -235,7 +244,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) { ...@@ -235,7 +244,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) {
dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg)); dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg));
} }
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) {
STableCfgMsg *cfg = (STableCfgMsg *) rpcMallocCont(sizeof(STableCfgMsg)); STableCfgMsg *cfg = (STableCfgMsg *) rpcMallocCont(sizeof(STableCfgMsg));
if (cfg == NULL) { if (cfg == NULL) {
return; return;
...@@ -248,8 +257,8 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { ...@@ -248,8 +257,8 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
void dnodeInitProcessShellMsg() { void dnodeInitProcessShellMsg() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
......
...@@ -40,34 +40,32 @@ static void *tsDnodeShellServer = NULL; ...@@ -40,34 +40,32 @@ static void *tsDnodeShellServer = NULL;
static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0;
void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) { void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) {
assert(handle != NULL); assert(handle != NULL);
if (pCont == NULL || contLen == 0) { if (pCont == NULL || contLen == 0) {
dnodeFreeQInfo(handle); dnodeFreeQInfo(handle);
dTrace("conn:%p, free query info", handle); dTrace("conn:%p, free query info", handle);
return NULL; return;
} }
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0); rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0);
dTrace("conn:%p, query msg is ignored since dnode not running", handle); dTrace("conn:%p, query msg is ignored since dnode not running", handle);
return NULL; return;
} }
dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]); dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]);
if (msgType == TSDB_MSG_TYPE_DNODE_QUERY) { if (msgType == TSDB_MSG_TYPE_QUERY) {
dnodeProcessQueryRequest(pCont, contLen, handle); dnodeProcessQueryRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_RETRIEVE) { } else if (msgType == TSDB_MSG_TYPE_RETRIEVE) {
dnodeProcessRetrieveRequest(pCont, contLen, handle); dnodeProcessRetrieveRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_DNODE_SUBMIT) { } else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
dnodeProcessShellSubmitRequest(pCont, contLen, handle); dnodeProcessShellSubmitRequest(pCont, contLen, handle);
} else { } else {
dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]); dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]);
} }
return NULL;
} }
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
...@@ -187,9 +185,9 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) { ...@@ -187,9 +185,9 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) {
for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) { for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) {
SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i]; SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i];
if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) { if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) {
dnodeSendVpeerCfgMsg(block->vnode); dnodeSendVnodeCfgMsg(block->vnode);
} else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) { } else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) {
dnodeSendMeterCfgMsg(block->vnode, block->sid); dnodeSendTableCfgMsg(block->vnode, block->sid);
} }
block->index = htonl(block->index); block->index = htonl(block->index);
block->vnode = htonl(block->vnode); block->vnode = htonl(block->vnode);
......
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
#include "dnodeVnodeMgmt.h" #include "dnodeVnodeMgmt.h"
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) {
dTrace("submit msg is disposed, affectrows:1");
SShellSubmitRspMsg result = {0}; SShellSubmitRspMsg result = {0};
int32_t numOfSid = htonl(pSubmit->numOfSid); int32_t numOfSid = htonl(pSubmit->numOfSid);
...@@ -31,7 +33,11 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe ...@@ -31,7 +33,11 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe
callback(&result, pConn); callback(&result, pConn);
} }
//TODO: submit implementation result.code = 0;
result.numOfRows = 1;
result.affectedRows = 1;
result.numOfFailedBlocks = 0;
callback(&result, pConn);
} }
int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { int32_t dnodeCreateTable(SDCreateTableMsg *pTable) {
...@@ -65,7 +71,6 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { ...@@ -65,7 +71,6 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/* /*
* Remove table from local repository * Remove table from local repository
*/ */
...@@ -78,24 +83,16 @@ int32_t dnodeDropTable(SDRemoveTableMsg *pTable) { ...@@ -78,24 +83,16 @@ int32_t dnodeDropTable(SDRemoveTableMsg *pTable) {
* Create stream * Create stream
* if stream already exist, update it * if stream already exist, update it
*/ */
int32_t dnodeCreateStream(SAlterStreamMsg *stream) { int32_t dnodeCreateStream(SDAlterStreamMsg *pStream) {
int32_t vnode = htonl(stream->vnode); dPrint("stream:%s, is created, ", pStream->tableId);
int32_t sid = htonl(stream->sid); return TSDB_CODE_SUCCESS;
uint64_t uid = htobe64(stream->uid);
if (!dnodeCheckTableExist(vnode, sid, uid)) {
return TSDB_CODE_INVALID_TABLE;
}
//TODO create or remove stream
return 0;
} }
/* /*
* Remove all child tables of supertable from local repository * Remove all child tables of supertable from local repository
*/ */
int32_t dnodeDropSuperTable(uint64_t stableUid) { int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable) {
dPrint("stable:%s, is removed", pStable->tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
此差异已折叠。
...@@ -31,20 +31,20 @@ extern "C" { ...@@ -31,20 +31,20 @@ extern "C" {
// message type // message type
#define TSDB_MSG_TYPE_REG 1 #define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2 #define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_DNODE_SUBMIT 3 #define TSDB_MSG_TYPE_SUBMIT 3
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 #define TSDB_MSG_TYPE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5 #define TSDB_MSG_TYPE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 #define TSDB_MSG_TYPE_QUERY_RSP 6
#define TSDB_MSG_TYPE_RETRIEVE 7 #define TSDB_MSG_TYPE_RETRIEVE 7
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 #define TSDB_MSG_TYPE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9 #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_CREATE_VNODE 13 #define TSDB_MSG_TYPE_CREATE_VNODE 13
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14 #define TSDB_MSG_TYPE_CREATE_VNODE_RSP 14
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15 #define TSDB_MSG_TYPE_FREE_VNODE 15
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16 #define TSDB_MSG_TYPE_FREE_VNODE_RSP 16
#define TSDB_MSG_TYPE_DNODE_CFG 17 #define TSDB_MSG_TYPE_DNODE_CFG 17
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18 #define TSDB_MSG_TYPE_DNODE_CFG_RSP 18
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19 #define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19
...@@ -195,13 +195,13 @@ typedef struct { ...@@ -195,13 +195,13 @@ typedef struct {
int32_t sid; int32_t sid;
int32_t sversion; int32_t sversion;
uint64_t uid; uint64_t uid;
short numOfRows; int16_t numOfRows;
char payLoad[]; char payLoad[];
} SShellSubmitBlock; } SShellSubmitBlock;
typedef struct { typedef struct {
int8_t import; int16_t import;
int8_t reserved[3]; int16_t vnode;
int32_t numOfSid; /* total number of sid */ int32_t numOfSid; /* total number of sid */
char blks[]; /* numOfSid blocks, each blocks for one table */ char blks[]; /* numOfSid blocks, each blocks for one table */
} SShellSubmitMsg; } SShellSubmitMsg;
...@@ -219,7 +219,7 @@ typedef struct { ...@@ -219,7 +219,7 @@ typedef struct {
int32_t affectedRows; // number of records actually written int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records) int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks; int32_t numOfFailedBlocks;
SShellSubmitRspBlock *failedBlocks; SShellSubmitRspBlock failedBlocks[];
} SShellSubmitRspMsg; } SShellSubmitRspMsg;
typedef struct SSchema { typedef struct SSchema {
...@@ -337,6 +337,7 @@ typedef struct { ...@@ -337,6 +337,7 @@ typedef struct {
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int64_t uid;
} SDRemoveSuperTableMsg; } SDRemoveSuperTableMsg;
typedef struct { typedef struct {
...@@ -697,6 +698,7 @@ typedef struct STableMeta { ...@@ -697,6 +698,7 @@ typedef struct STableMeta {
int16_t numOfColumns; int16_t numOfColumns;
int16_t rowSize; // used locally, calculated in client int16_t rowSize; // used locally, calculated in client
int16_t sversion; int16_t sversion;
int8_t numOfVpeers;
SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t sid; int32_t sid;
int32_t vgid; int32_t vgid;
...@@ -802,7 +804,8 @@ typedef struct { ...@@ -802,7 +804,8 @@ typedef struct {
uint64_t uid; uint64_t uid;
uint64_t stime; // stream starting time uint64_t stime; // stream starting time
int32_t status; int32_t status;
} SAlterStreamMsg; char tableId[TSDB_TABLE_ID_LEN + 1];
} SDAlterStreamMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -86,7 +86,7 @@ int main(int argc, char* argv[]) { ...@@ -86,7 +86,7 @@ int main(int argc, char* argv[]) {
{ {
printf("=== this a test for debug usage\n"); printf("=== this a test for debug usage\n");
void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0); void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
taos_query(taos, "create table d1.c2 using d1.st2 tags(1)"); taos_query(taos, "insert into d1.t14 values(now, 1)");
while (1) { while (1) {
sleep(1000); sleep(1000);
} }
......
...@@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj * ...@@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
*pTableOut = pTable; *pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
...@@ -467,6 +467,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p ...@@ -467,6 +467,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p
pMeta->numOfColumns = htons(pTable->superTable->numOfColumns); pMeta->numOfColumns = htons(pTable->superTable->numOfColumns);
pMeta->tableType = pTable->type; pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable); pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable);
strcpy(pMeta->tableId, pTable->tableId);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
...@@ -481,6 +482,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p ...@@ -481,6 +482,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} }
} }
pMeta->numOfVpeers = pVgroup->numOfVnodes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -106,8 +106,9 @@ static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe ...@@ -106,8 +106,9 @@ static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe
mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
//TODO
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
mgmtSendCreateTableMsg(pTable, &ipSet, NULL); mgmtSendCreateTableMsg(NULL, &ipSet, NULL);
} }
static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) {
...@@ -216,7 +217,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo ...@@ -216,7 +217,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo
mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
SCreateVnodeMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); SCreateVnodeMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
if (pVpeer != NULL) { if (pVpeer != NULL) {
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_VNODE, pVpeer, sizeof(SCreateVnodeMsg), ahandle); mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_CREATE_VNODE, pVpeer, sizeof(SCreateVnodeMsg), ahandle);
} }
} }
...@@ -226,7 +227,7 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p ...@@ -226,7 +227,7 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
return; return;
} }
mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[msgType], pConn); mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn);
if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn); mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn);
...@@ -236,9 +237,9 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p ...@@ -236,9 +237,9 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code); mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) { } else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) {
mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code); mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) { } else if (msgType == TSDB_MSG_TYPE_CREATE_VNODE_RSP) {
mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code); mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) { } else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) {
mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code); mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
...@@ -261,7 +262,7 @@ void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { ...@@ -261,7 +262,7 @@ void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg));
if (pFreeVnode != NULL) { if (pFreeVnode != NULL) {
pFreeVnode->vnode = htonl(vnode); pFreeVnode->vnode = htonl(vnode);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle); mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle);
} }
} }
......
...@@ -395,7 +395,7 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj ...@@ -395,7 +395,7 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
*pTableOut = pTable; *pTableOut = (STableInfo *) pTable;
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
...@@ -564,6 +564,7 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta ...@@ -564,6 +564,7 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} }
} }
pMeta->numOfVpeers = pVgroup->numOfVnodes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -631,6 +631,7 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMeta *p ...@@ -631,6 +631,7 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMeta *p
pMeta->numOfColumns = htons(pTable->numOfColumns); pMeta->numOfColumns = htons(pTable->numOfColumns);
pMeta->tableType = pTable->type; pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable);
strcpy(pMeta->tableId, pTable->tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -355,7 +355,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in ...@@ -355,7 +355,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
// connection type is application specific. // connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
type == TSDB_MSG_TYPE_SHOW ) type == TSDB_MSG_TYPE_SHOW )
pContext->connType = RPC_CONN_TCPC; pContext->connType = RPC_CONN_TCPC;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册