cqMain.c 9.4 KB
Newer Older
J
draft  
jtao1735 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
J
jtao1735 已提交
17

H
TD-354  
Hongze Cheng 已提交
18 19
#include <errno.h>
#include <pthread.h>
J
jtao1735 已提交
20 21
#include <stdlib.h>
#include <string.h>
H
TD-354  
Hongze Cheng 已提交
22 23

#include "taos.h"
24
#include "tsclient.h"
J
jtao1735 已提交
25
#include "taosdef.h"
J
draft  
jtao1735 已提交
26
#include "taosmsg.h"
B
Bomin Zhang 已提交
27
#include "ttimer.h"
H
TD-354  
Hongze Cheng 已提交
28 29
#include "tcq.h"
#include "tdataformat.h"
J
jtao1735 已提交
30
#include "tglobal.h"
J
jtao1735 已提交
31 32 33
#include "tlog.h"
#include "twal.h"

S
TD-1520  
Shengliang Guan 已提交
34 35 36 37 38
#define cFatal(...) { if (cqDebugFlag & DEBUG_FATAL) { taosPrintLog("CQ  FATAL ", 255, __VA_ARGS__); }}
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("CQ  ERROR ", 255, __VA_ARGS__); }}
#define cWarn(...)  { if (cqDebugFlag & DEBUG_WARN)  { taosPrintLog("CQ  WARN ", 255, __VA_ARGS__); }}
#define cInfo(...)  { if (cqDebugFlag & DEBUG_INFO)  { taosPrintLog("CQ  ", 255, __VA_ARGS__); }}
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ  ", cqDebugFlag, __VA_ARGS__); }}
S
Shengliang Guan 已提交
39
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ  ", cqDebugFlag, __VA_ARGS__); }}
J
jtao1735 已提交
40 41 42 43 44

typedef struct {
  int      vgId;
  char     user[TSDB_USER_LEN];
  char     pass[TSDB_PASSWORD_LEN];
B
Bomin Zhang 已提交
45
  char     db[TSDB_DB_NAME_LEN];
J
jtao1735 已提交
46 47 48 49 50
  FCqWrite cqWrite;
  void    *ahandle;
  int      num;      // number of continuous streams
  struct SCqObj *pHead;
  void    *dbConn;
51
  int      master;
B
Bomin Zhang 已提交
52
  void    *tmrCtrl;
J
jtao1735 已提交
53 54 55 56
  pthread_mutex_t mutex;
} SCqContext;

typedef struct SCqObj {
B
Bomin Zhang 已提交
57
  tmr_h          tmrId;
B
Bomin Zhang 已提交
58 59
  uint64_t       uid;
  int32_t        tid;      // table ID
H
TD-354  
Hongze Cheng 已提交
60 61 62 63 64 65 66
  int            rowSize;  // bytes of a row
  char *         sqlStr;   // SQL string
  STSchema *     pSchema;  // pointer to schema array
  void *         pStream;
  struct SCqObj *prev;
  struct SCqObj *next;
  SCqContext *   pContext;
J
jtao1735 已提交
67 68 69 70 71
} SCqObj;

int cqDebugFlag = 135;

static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); 
72
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
J
jtao1735 已提交
73 74 75

void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
  SCqContext *pContext = calloc(sizeof(SCqContext), 1);
76 77 78 79
  if (pContext == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }
J
jtao1735 已提交
80

B
Bomin Zhang 已提交
81 82
  pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");

B
Bomin Zhang 已提交
83 84
  tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
  tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass));
B
Bomin Zhang 已提交
85 86 87 88 89 90 91
  const char* db = pCfg->db;
  for (const char* p = db; *p != 0; p++) {
    if (*p == '.') {
      db = p + 1;
      break;
    }
  }
B
Bomin Zhang 已提交
92
  tstrncpy(pContext->db, db, sizeof(pContext->db));
J
jtao1735 已提交
93 94 95
  pContext->vgId = pCfg->vgId;
  pContext->cqWrite = pCfg->cqWrite;
  pContext->ahandle = ahandle;
96
  tscEmbedded = 1;
J
jtao1735 已提交
97 98

  pthread_mutex_init(&pContext->mutex, NULL);
J
draft  
jtao1735 已提交
99

S
TD-1520  
Shengliang Guan 已提交
100
  cInfo("vgId:%d, CQ is opened", pContext->vgId);
J
draft  
jtao1735 已提交
101

J
jtao1735 已提交
102 103
  return pContext;
}
J
draft  
jtao1735 已提交
104

J
jtao1735 已提交
105 106
void cqClose(void *handle) {
  SCqContext *pContext = handle;
107
  if (handle == NULL) return;
J
draft  
jtao1735 已提交
108

J
jtao1735 已提交
109 110
  // stop all CQs
  cqStop(pContext);
J
draft  
jtao1735 已提交
111

J
jtao1735 已提交
112
  // free all resources
113 114
  pthread_mutex_lock(&pContext->mutex);

J
jtao1735 已提交
115 116 117 118
  SCqObj *pObj = pContext->pHead;
  while (pObj) {
    SCqObj *pTemp = pObj;
    pObj = pObj->next;
B
Bomin Zhang 已提交
119
    tdFreeSchema(pTemp->pSchema);
S
Shengliang Guan 已提交
120
    taosTFree(pTemp->sqlStr);
J
jtao1735 已提交
121 122 123
    free(pTemp);
  } 
  
124 125
  pthread_mutex_unlock(&pContext->mutex);

J
jtao1735 已提交
126 127
  pthread_mutex_destroy(&pContext->mutex);

B
Bomin Zhang 已提交
128 129 130
  taosTmrCleanUp(pContext->tmrCtrl);
  pContext->tmrCtrl = NULL;

S
TD-1520  
Shengliang Guan 已提交
131
  cInfo("vgId:%d, CQ is closed", pContext->vgId);
J
jtao1735 已提交
132
  free(pContext);
J
draft  
jtao1735 已提交
133 134
}

J
jtao1735 已提交
135 136
void cqStart(void *handle) {
  SCqContext *pContext = handle;
137
  if (pContext->dbConn || pContext->master) return;
J
jtao1735 已提交
138

S
TD-1520  
Shengliang Guan 已提交
139
  cInfo("vgId:%d, start all CQs", pContext->vgId);
J
jtao1735 已提交
140 141
  pthread_mutex_lock(&pContext->mutex);

142
  pContext->master = 1;
J
jtao1735 已提交
143 144 145

  SCqObj *pObj = pContext->pHead;
  while (pObj) {
146
    cqCreateStream(pContext, pObj);
J
jtao1735 已提交
147
    pObj = pObj->next;
J
draft  
jtao1735 已提交
148
  }
J
jtao1735 已提交
149 150

  pthread_mutex_unlock(&pContext->mutex);
J
draft  
jtao1735 已提交
151 152
}

J
jtao1735 已提交
153 154
void cqStop(void *handle) {
  SCqContext *pContext = handle;
S
TD-1520  
Shengliang Guan 已提交
155
  cInfo("vgId:%d, stop all CQs", pContext->vgId);
156
  if (pContext->dbConn == NULL || pContext->master == 0) return;
J
draft  
jtao1735 已提交
157

J
jtao1735 已提交
158
  pthread_mutex_lock(&pContext->mutex);
J
draft  
jtao1735 已提交
159

160
  pContext->master = 0;
J
jtao1735 已提交
161 162
  SCqObj *pObj = pContext->pHead;
  while (pObj) {
J
jtao1735 已提交
163 164 165
    if (pObj->pStream) {
      taos_close_stream(pObj->pStream);
      pObj->pStream = NULL;
S
TD-1520  
Shengliang Guan 已提交
166
      cInfo("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
B
Bomin Zhang 已提交
167 168 169
    } else {
      taosTmrStop(pObj->tmrId);
      pObj->tmrId = 0;
J
jtao1735 已提交
170
    }
J
jtao1735 已提交
171
    pObj = pObj->next;
J
draft  
jtao1735 已提交
172
  }
J
jtao1735 已提交
173 174 175 176 177

  if (pContext->dbConn) taos_close(pContext->dbConn);
  pContext->dbConn = NULL;

  pthread_mutex_unlock(&pContext->mutex);
J
draft  
jtao1735 已提交
178 179
}

B
Bomin Zhang 已提交
180
void *cqCreate(void *handle, uint64_t uid, int tid, char *sqlStr, STSchema *pSchema) {
J
jtao1735 已提交
181
  SCqContext *pContext = handle;
J
draft  
jtao1735 已提交
182

J
jtao1735 已提交
183
  SCqObj *pObj = calloc(sizeof(SCqObj), 1);
J
jtao1735 已提交
184
  if (pObj == NULL) return NULL;
J
draft  
jtao1735 已提交
185

B
Bomin Zhang 已提交
186
  pObj->uid = uid;
J
jtao1735 已提交
187
  pObj->tid = tid;
J
jtao1735 已提交
188 189
  pObj->sqlStr = malloc(strlen(sqlStr)+1);
  strcpy(pObj->sqlStr, sqlStr);
J
draft  
jtao1735 已提交
190

H
TD-354  
Hongze Cheng 已提交
191
  pObj->pSchema = tdDupSchema(pSchema);
B
Bomin Zhang 已提交
192
  pObj->rowSize = schemaTLen(pSchema);
J
draft  
jtao1735 已提交
193

S
TD-1520  
Shengliang Guan 已提交
194
  cInfo("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
J
draft  
jtao1735 已提交
195

J
jtao1735 已提交
196
  pthread_mutex_lock(&pContext->mutex);
J
draft  
jtao1735 已提交
197

J
jtao1735 已提交
198
  pObj->next = pContext->pHead;
J
jtao1735 已提交
199
  if (pContext->pHead) pContext->pHead->prev = pObj;
J
jtao1735 已提交
200
  pContext->pHead = pObj;
J
draft  
jtao1735 已提交
201

202
  cqCreateStream(pContext, pObj);
J
draft  
jtao1735 已提交
203

J
jtao1735 已提交
204
  pthread_mutex_unlock(&pContext->mutex);
J
jtao1735 已提交
205 206

  return pObj;
J
jtao1735 已提交
207 208
}

J
jtao1735 已提交
209 210 211
void cqDrop(void *handle) {
  SCqObj *pObj = handle;
  SCqContext *pContext = pObj->pContext;
J
jtao1735 已提交
212 213 214

  pthread_mutex_lock(&pContext->mutex);

J
jtao1735 已提交
215 216 217 218 219
  if (pObj->prev) {
    pObj->prev->next = pObj->next;
  } else {
    pContext->pHead = pObj->next;
  }
J
jtao1735 已提交
220

J
jtao1735 已提交
221 222
  if (pObj->next) {
    pObj->next->prev = pObj->prev;
J
draft  
jtao1735 已提交
223 224
  }

J
jtao1735 已提交
225
  // free the resources associated
B
Bomin Zhang 已提交
226 227 228 229 230 231 232
  if (pObj->pStream) {
    taos_close_stream(pObj->pStream);
    pObj->pStream = NULL;
  } else {
    taosTmrStop(pObj->tmrId);
    pObj->tmrId = 0;
  }
J
draft  
jtao1735 已提交
233

S
TD-1520  
Shengliang Guan 已提交
234
  cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); 
235 236
  tdFreeSchema(pObj->pSchema);
  free(pObj->sqlStr);
J
jtao1735 已提交
237
  free(pObj);
J
draft  
jtao1735 已提交
238

B
Bomin Zhang 已提交
239
  pthread_mutex_unlock(&pContext->mutex);
J
draft  
jtao1735 已提交
240 241
}

242 243 244 245 246 247 248 249
static void doCreateStream(void *param, TAOS_RES *result, int code) {
  SCqObj* pObj = (SCqObj*)param;
  SCqContext* pContext = pObj->pContext;
  SSqlObj* pSql = (SSqlObj*)result;
  pContext->dbConn = pSql->pTscObj;
  cqCreateStream(pContext, pObj);
}

B
Bomin Zhang 已提交
250 251 252 253
static void cqProcessCreateTimer(void *param, void *tmrId) {
  SCqObj* pObj = (SCqObj*)param;
  SCqContext* pContext = pObj->pContext;

254
  if (pContext->dbConn == NULL) {
255 256 257
    taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
  } else {
    cqCreateStream(pContext, pObj);
258
  }
B
Bomin Zhang 已提交
259
}
260

B
Bomin Zhang 已提交
261
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
B
Bomin Zhang 已提交
262
  pObj->pContext = pContext;
B
Bomin Zhang 已提交
263 264 265 266 267 268 269 270

  if (pContext->dbConn == NULL) {
    pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl);
    return;
  }
  pObj->tmrId = 0;

  pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
271 272
  if (pObj->pStream) {
    pContext->num++;
S
TD-1520  
Shengliang Guan 已提交
273
    cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
274 275 276 277 278
  } else {
    cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
  }
}

J
jtao1735 已提交
279 280
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
  SCqObj     *pObj = (SCqObj *)param;
B
Bomin Zhang 已提交
281 282 283 284
  if (tres == NULL && row == NULL) {
    pObj->pStream = NULL;
    return;
  }
J
jtao1735 已提交
285
  SCqContext *pContext = pObj->pContext;
B
Bomin Zhang 已提交
286
  STSchema   *pSchema = pObj->pSchema;
J
jtao1735 已提交
287
  if (pObj->pStream == NULL) return;
J
draft  
jtao1735 已提交
288

S
TD-1520  
Shengliang Guan 已提交
289
  cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
J
jtao1735 已提交
290

291
  int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
J
jtao1735 已提交
292 293 294
  char *buffer = calloc(size, 1);

  SWalHead   *pHead = (SWalHead *)buffer;
B
Bomin Zhang 已提交
295 296
  SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
  SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
J
draft  
jtao1735 已提交
297

B
Bomin Zhang 已提交
298
  SDataRow trow = (SDataRow)pBlk->data;
299
  tdInitDataRow(trow, pSchema);
J
jtao1735 已提交
300

B
Bomin Zhang 已提交
301
  for (int32_t i = 0; i < pSchema->numOfCols; i++) {
302
    STColumn *c = pSchema->columns + i;
303 304 305
    void* val = row[i];
    if (val == NULL) {
      val = getNullValue(c->type);
B
Bomin Zhang 已提交
306
    } else if (c->type == TSDB_DATA_TYPE_BINARY) {
307
      val = ((char*)val) - sizeof(VarDataLenT);
B
Bomin Zhang 已提交
308 309 310 311 312 313
    } else if (c->type == TSDB_DATA_TYPE_NCHAR) {
      char buf[TSDB_MAX_NCHAR_LEN];
      size_t len = taos_fetch_lengths(tres)[i];
      taosMbsToUcs4(val, len, buf, sizeof(buf), &len);
      memcpy(val + sizeof(VarDataLenT), buf, len);
      varDataLen(val) = len;
314 315
    }
    tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
B
Bomin Zhang 已提交
316
  }
H
Haojun Liao 已提交
317 318
  pBlk->dataLen = htonl(dataRowLen(trow));
  pBlk->schemaLen = 0;
B
Bomin Zhang 已提交
319 320 321 322 323 324 325

  pBlk->uid = htobe64(pObj->uid);
  pBlk->tid = htonl(pObj->tid);
  pBlk->numOfRows = htons(1);
  pBlk->sversion = htonl(pSchema->version);
  pBlk->padding = 0;

B
Bomin Zhang 已提交
326 327
  pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow);

B
Bomin Zhang 已提交
328
  pMsg->header.vgId = htonl(pContext->vgId);
B
Bomin Zhang 已提交
329
  pMsg->header.contLen = htonl(pHead->len);
B
Bomin Zhang 已提交
330 331 332 333 334
  pMsg->length = pMsg->header.contLen;
  pMsg->numOfBlocks = htonl(1);

  pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
  pHead->version = 0;
J
jtao1735 已提交
335 336 337

  // write into vnode write queue
  pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
B
Bomin Zhang 已提交
338
  free(buffer);
J
jtao1735 已提交
339 340
}