diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 9efa517ac33b2d241dbe605c2386378203099a75..8b10860ef0db023cb0d3ef9534d0ce76fcc7aed7 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -39,16 +39,16 @@ #define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }} typedef struct { - int vgId; + int32_t vgId; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_NAME_LEN]; FCqWrite cqWrite; void *ahandle; - int num; // number of continuous streams + int32_t num; // number of continuous streams struct SCqObj *pHead; void *dbConn; - int master; + int32_t master; void *tmrCtrl; pthread_mutex_t mutex; } SCqContext; @@ -57,7 +57,7 @@ typedef struct SCqObj { tmr_h tmrId; uint64_t uid; int32_t tid; // table ID - int rowSize; // bytes of a row + int32_t rowSize; // bytes of a row char * sqlStr; // SQL string STSchema * pSchema; // pointer to schema array void * pStream; @@ -175,7 +175,7 @@ void cqStop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) { +void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *pSchema) { SCqContext *pContext = handle; SCqObj *pObj = calloc(sizeof(SCqObj), 1); @@ -237,7 +237,7 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -static void doCreateStream(void *param, TAOS_RES *result, int code) { +static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { SCqObj* pObj = (SCqObj*)param; SCqContext* pContext = pObj->pContext; SSqlObj* pSql = (SSqlObj*)result; @@ -288,7 +288,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); - int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; + int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; char *buffer = calloc(size, 1); SWalHead *pHead = (SWalHead *)buffer; diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 4a23695a1a851d861d23da6142c6bd909fa73d7c..0f8a7a6a9e6e93f34424a489aa10b879190673ef 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -24,7 +24,7 @@ extern "C" { typedef int32_t (*FCqWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef struct { - int vgId; + int32_t vgId; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_NAME_LEN]; @@ -42,12 +42,12 @@ void cqStart(void *handle); void cqStop(void *handle); // cqCreate is called by TSDB to start an instance of CQ -void *cqCreate(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema); +void *cqCreate(void *handle, uint64_t uid, int32_t sid, char *sqlStr, STSchema *pSchema); // cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate void cqDrop(void *handle); -extern int cqDebugFlag; +extern int32_t cqDebugFlag; #ifdef __cplusplus diff --git a/src/inc/twal.h b/src/inc/twal.h index 05be90775fba352ae861384d61a14fb62529b0cb..a9e0ddd6b7ffd0708e4bd1c83f31b826e3bec674 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -32,7 +32,8 @@ typedef enum { typedef struct { int8_t msgType; - int8_t reserved[3]; + int8_t sver; + int8_t reserved[2]; int32_t len; uint64_t version; uint32_t signature;