mndConsumer.c 9.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
L
Liu Jicong 已提交
27
#include "tcompare.h"
L
Liu Jicong 已提交
28 29 30 31 32
#include "tname.h"

#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64

L
Liu Jicong 已提交
33 34 35 36 37 38 39
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void    mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
L
Liu Jicong 已提交
40 41 42

int32_t mndInitConsumer(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_CONSUMER,
L
Liu Jicong 已提交
43
                     .keyType = SDB_KEY_INT64,
L
Liu Jicong 已提交
44 45 46 47 48 49 50 51 52 53 54
                     .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
                     .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
                     .insertFp = (SdbInsertFp)mndConsumerActionInsert,
                     .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndConsumerActionDelete};

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupConsumer(SMnode *pMnode) {}

L
Liu Jicong 已提交
55 56 57
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
L
Liu Jicong 已提交
58 59 60
  int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;

  SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
61
  if (pRaw == NULL) goto CM_ENCODE_OVER;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
  void *buf = malloc(tlen);
L
Liu Jicong 已提交
64 65
  if (buf == NULL) goto CM_ENCODE_OVER;

L
Liu Jicong 已提交
66
  void *abuf = buf;
L
Liu Jicong 已提交
67 68
  tEncodeSMqConsumerObj(&abuf, pConsumer);

L
Liu Jicong 已提交
69
  int32_t dataPos = 0;
L
Liu Jicong 已提交
70 71
  SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
L
Liu Jicong 已提交
72 73
  SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
  SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75 76
  terrno = TSDB_CODE_SUCCESS;

77 78
CM_ENCODE_OVER:
  if (terrno != 0) {
L
Liu Jicong 已提交
79
    mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
80 81 82 83
    sdbFreeRaw(pRaw);
    return NULL;
  }

L
Liu Jicong 已提交
84
  mTrace("consumer:%ld, encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
L
Liu Jicong 已提交
85 86 87
  return pRaw;
}

L
Liu Jicong 已提交
88
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
89 90
  terrno = TSDB_CODE_OUT_OF_MEMORY;

L
Liu Jicong 已提交
91
  int8_t sver = 0;
L
Liu Jicong 已提交
92
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
L
Liu Jicong 已提交
93 94 95

  if (sver != MND_CONSUMER_VER_NUMBER) {
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
L
Liu Jicong 已提交
96
    goto CM_DECODE_OVER;
L
Liu Jicong 已提交
97 98
  }

L
Liu Jicong 已提交
99
  SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
L
Liu Jicong 已提交
100
  if (pRow == NULL) goto CM_DECODE_OVER;
101

L
Liu Jicong 已提交
102
  SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
L
Liu Jicong 已提交
103
  if (pConsumer == NULL) goto CM_DECODE_OVER;
L
Liu Jicong 已提交
104 105

  int32_t dataPos = 0;
L
Liu Jicong 已提交
106
  int32_t len;
L
Liu Jicong 已提交
107
  SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
L
Liu Jicong 已提交
108
  void *buf = malloc(len);
L
Liu Jicong 已提交
109 110 111
  if (buf == NULL) goto CM_DECODE_OVER;
  SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
  SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
L
Liu Jicong 已提交
112

L
Liu Jicong 已提交
113 114 115
  if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
    goto CM_DECODE_OVER;
  }
L
Liu Jicong 已提交
116 117 118

  terrno = TSDB_CODE_SUCCESS;

L
Liu Jicong 已提交
119 120
CM_DECODE_OVER:
  if (terrno != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
121
    mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
122 123
    tfree(pRow);
    return NULL;
L
Liu Jicong 已提交
124
  }
L
Liu Jicong 已提交
125 126 127 128

  return pRow;
}

L
Liu Jicong 已提交
129 130
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
  mTrace("consumer:%ld, perform insert action", pConsumer->consumerId);
L
Liu Jicong 已提交
131 132 133
  return 0;
}

L
Liu Jicong 已提交
134 135
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
  mTrace("consumer:%ld, perform delete action", pConsumer->consumerId);
L
Liu Jicong 已提交
136 137 138
  return 0;
}

L
Liu Jicong 已提交
139 140
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
  mTrace("consumer:%ld, perform update action", pOldConsumer->consumerId);
L
Liu Jicong 已提交
141 142

  // TODO handle update
L
Liu Jicong 已提交
143 144
  /*taosWLockLatch(&pOldConsumer->lock);*/
  /*taosWUnLockLatch(&pOldConsumer->lock);*/
L
Liu Jicong 已提交
145 146 147 148

  return 0;
}

L
Liu Jicong 已提交
149
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
L
Liu Jicong 已提交
150 151
  SSdb           *pSdb = pMnode->pSdb;
  SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
L
Liu Jicong 已提交
152 153 154 155 156 157
  if (pConsumer == NULL) {
    /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
  }
  return pConsumer;
}

L
Liu Jicong 已提交
158
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
159 160 161 162
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pConsumer);
}

L
Liu Jicong 已提交
163
#if 0
L
Liu Jicong 已提交
164 165
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
  SMnode        *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
166
  STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
L
Liu Jicong 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

  mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);

  SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname);
  if (pConsumer == NULL) {
    mndReleaseDb(pMnode, pDb);
    terrno = TSDB_CODE_MND_INVALID_CONSUMER;
    mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  taosRLockLatch(&pConsumer->lock);
  int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
S
Shengliang Guan 已提交
187
  int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
L
Liu Jicong 已提交
188

S
Shengliang Guan 已提交
189
  STableMetaRsp *pMeta = rpcMallocCont(contLen);
L
Liu Jicong 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
  if (pMeta == NULL) {
    taosRUnLockLatch(&pConsumer->lock);
    mndReleaseDb(pMnode, pDb);
    mndReleaseConsumer(pMnode, pConsumer);
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
    return -1;
  }

  memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN);
  pMeta->numOfTags = htonl(pConsumer->numOfTags);
  pMeta->numOfColumns = htonl(pConsumer->numOfColumns);
  pMeta->precision = pDb->cfg.precision;
  pMeta->tableType = TSDB_SUPER_TABLE;
  pMeta->update = pDb->cfg.update;
  pMeta->sversion = htonl(pConsumer->version);
  pMeta->tuid = htonl(pConsumer->uid);

  for (int32_t i = 0; i < totalCols; ++i) {
    SSchema *pSchema = &pMeta->pSchema[i];
    SSchema *pSrcSchema = &pConsumer->pSchema[i];
    memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
    pSchema->type = pSrcSchema->type;
    pSchema->colId = htonl(pSrcSchema->colId);
    pSchema->bytes = htonl(pSrcSchema->bytes);
  }
  taosRUnLockLatch(&pConsumer->lock);
  mndReleaseDb(pMnode, pDb);
  mndReleaseConsumer(pMnode, pConsumer);

  pMsg->pCont = pMeta;
  pMsg->contLen = contLen;

  mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags);
  return 0;
}

static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) {
  SSdb *pSdb = pMnode->pSdb;

  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
  if (pDb == NULL) {
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
    return -1;
  }

  int32_t numOfConsumers = 0;
  void   *pIter = NULL;
  while (1) {
L
Liu Jicong 已提交
239
    SMqConsumerObj *pConsumer = NULL;
L
Liu Jicong 已提交
240 241 242
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
    if (pIter == NULL) break;

L
Liu Jicong 已提交
243
    numOfConsumers++;
L
Liu Jicong 已提交
244 245 246 247 248 249 250 251

    sdbRelease(pSdb, pConsumer);
  }

  *pNumOfConsumers = numOfConsumers;
  return 0;
}

S
Shengliang Guan 已提交
252
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
L
Liu Jicong 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
  SMnode *pMnode = pMsg->pMnode;
  SSdb   *pSdb = pMnode->pSdb;

  if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) {
    return -1;
  }

  int32_t  cols = 0;
  SSchema *pSchema = pMeta->pSchema;

  pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
  pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
  strcpy(pSchema[cols].name, "name");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 8;
  pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
  strcpy(pSchema[cols].name, "create_time");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "columns");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pShow->bytes[cols] = 4;
  pSchema[cols].type = TSDB_DATA_TYPE_INT;
  strcpy(pSchema[cols].name, "tags");
  pSchema[cols].bytes = htonl(pShow->bytes[cols]);
  cols++;

  pMeta->numOfColumns = htonl(cols);
  pShow->numOfColumns = cols;

  pShow->offset[0] = 0;
  for (int32_t i = 1; i < cols; ++i) {
    pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
  }

  pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
  pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
  strcpy(pMeta->tbFname, mndShowStr(pShow->type));

  return 0;
}

static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
  SSdb *pSdb = pMnode->pSdb;
  sdbCancelFetch(pSdb, pIter);
}
L
Liu Jicong 已提交
306
#endif