cqMain.c 6.8 KB
Newer Older
J
draft  
jtao1735 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
J
jtao1735 已提交
17 18 19 20 21

#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "taosdef.h"
J
draft  
jtao1735 已提交
22
#include "taosmsg.h"
J
jtao1735 已提交
23
#include "tglobal.h"
J
jtao1735 已提交
24 25 26 27 28
#include "tlog.h"
#include "twal.h"
#include "tcq.h"
#include "taos.h"

J
jtao1735 已提交
29 30 31 32
#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ  ", cqDebugFlag, __VA_ARGS__);}
#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ  ", cqDebugFlag, __VA_ARGS__);}
#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ  ", cqDebugFlag, __VA_ARGS__);}
#define cPrint(...) {taosPrintLog("CQ  ", 255, __VA_ARGS__);}
J
jtao1735 已提交
33 34 35 36 37 38 39 40 41 42

typedef struct {
  int      vgId;
  char     user[TSDB_USER_LEN];
  char     pass[TSDB_PASSWORD_LEN];
  FCqWrite cqWrite;
  void    *ahandle;
  int      num;      // number of continuous streams
  struct SCqObj *pHead;
  void    *dbConn;
43
  int      master;
J
jtao1735 已提交
44 45 46 47
  pthread_mutex_t mutex;
} SCqContext;

typedef struct SCqObj {
J
jtao1735 已提交
48
  int      tid;      // table ID
J
jtao1735 已提交
49 50 51 52 53
  int      rowSize;  // bytes of a row 
  char    *sqlStr;   // SQL string
  int      columns;  // number of columns
  SSchema *pSchema;  // pointer to schema array
  void    *pStream;
J
jtao1735 已提交
54
  struct SCqObj *prev; 
J
jtao1735 已提交
55 56 57 58 59 60 61
  struct SCqObj *next; 
  SCqContext *pContext;
} SCqObj;

int cqDebugFlag = 135;

static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); 
62
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
J
jtao1735 已提交
63 64 65 66 67 68 69 70 71 72 73

void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
  
  SCqContext *pContext = calloc(sizeof(SCqContext), 1);
  if (pContext == NULL) return NULL;

  strcpy(pContext->user, pCfg->user);
  strcpy(pContext->pass, pCfg->pass);
  pContext->vgId = pCfg->vgId;
  pContext->cqWrite = pCfg->cqWrite;
  pContext->ahandle = ahandle;
74
  tscEmbedded = 1;
J
jtao1735 已提交
75 76

  pthread_mutex_init(&pContext->mutex, NULL);
J
draft  
jtao1735 已提交
77

J
jtao1735 已提交
78
  cTrace("vgId:%d, CQ is opened", pContext->vgId);
J
draft  
jtao1735 已提交
79

J
jtao1735 已提交
80 81
  return pContext;
}
J
draft  
jtao1735 已提交
82

J
jtao1735 已提交
83 84
void cqClose(void *handle) {
  SCqContext *pContext = handle;
J
draft  
jtao1735 已提交
85

J
jtao1735 已提交
86 87
  // stop all CQs
  cqStop(pContext);
J
draft  
jtao1735 已提交
88

J
jtao1735 已提交
89
  // free all resources
90 91
  pthread_mutex_lock(&pContext->mutex);

J
jtao1735 已提交
92 93 94 95 96 97 98
  SCqObj *pObj = pContext->pHead;
  while (pObj) {
    SCqObj *pTemp = pObj;
    pObj = pObj->next;
    free(pTemp);
  } 
  
99 100
  pthread_mutex_unlock(&pContext->mutex);

J
jtao1735 已提交
101 102 103 104
  pthread_mutex_destroy(&pContext->mutex);

  cTrace("vgId:%d, CQ is closed", pContext->vgId);
  free(pContext);
J
draft  
jtao1735 已提交
105 106
}

J
jtao1735 已提交
107 108 109
void cqStart(void *handle) {
  SCqContext *pContext = handle;
  cTrace("vgId:%d, start all CQs", pContext->vgId);
110
  if (pContext->dbConn || pContext->master) return;
J
jtao1735 已提交
111 112 113

  pthread_mutex_lock(&pContext->mutex);

114
  pContext->master = 1;
J
jtao1735 已提交
115 116 117

  SCqObj *pObj = pContext->pHead;
  while (pObj) {
118
    cqCreateStream(pContext, pObj);
J
jtao1735 已提交
119
    pObj = pObj->next;
J
draft  
jtao1735 已提交
120
  }
J
jtao1735 已提交
121 122

  pthread_mutex_unlock(&pContext->mutex);
J
draft  
jtao1735 已提交
123 124
}

J
jtao1735 已提交
125 126 127
void cqStop(void *handle) {
  SCqContext *pContext = handle;
  cTrace("vgId:%d, stop all CQs", pContext->vgId);
128
  if (pContext->dbConn == NULL || pContext->master == 0) return;
J
draft  
jtao1735 已提交
129

J
jtao1735 已提交
130
  pthread_mutex_lock(&pContext->mutex);
J
draft  
jtao1735 已提交
131

132
  pContext->master = 0;
J
jtao1735 已提交
133 134
  SCqObj *pObj = pContext->pHead;
  while (pObj) {
J
jtao1735 已提交
135 136 137 138 139
    if (pObj->pStream) {
      taos_close_stream(pObj->pStream);
      pObj->pStream = NULL;
      cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
    }
J
draft  
jtao1735 已提交
140

J
jtao1735 已提交
141
    pObj = pObj->next;
J
draft  
jtao1735 已提交
142
  }
J
jtao1735 已提交
143 144 145 146 147

  if (pContext->dbConn) taos_close(pContext->dbConn);
  pContext->dbConn = NULL;

  pthread_mutex_unlock(&pContext->mutex);
J
draft  
jtao1735 已提交
148 149
}

J
jtao1735 已提交
150
void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int columns) {
J
jtao1735 已提交
151
  SCqContext *pContext = handle;
J
draft  
jtao1735 已提交
152

J
jtao1735 已提交
153
  SCqObj *pObj = calloc(sizeof(SCqObj), 1);
J
jtao1735 已提交
154
  if (pObj == NULL) return NULL;
J
draft  
jtao1735 已提交
155

J
jtao1735 已提交
156
  pObj->tid = tid;
J
jtao1735 已提交
157 158
  pObj->sqlStr = malloc(strlen(sqlStr)+1);
  strcpy(pObj->sqlStr, sqlStr);
J
draft  
jtao1735 已提交
159

J
jtao1735 已提交
160
  pObj->columns = columns;
J
draft  
jtao1735 已提交
161

J
jtao1735 已提交
162 163 164
  int size = sizeof(SSchema) * columns;
  pObj->pSchema = malloc(size);
  memcpy(pObj->pSchema, pSchema, size);
J
draft  
jtao1735 已提交
165

J
jtao1735 已提交
166
  cTrace("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
J
draft  
jtao1735 已提交
167

J
jtao1735 已提交
168
  pthread_mutex_lock(&pContext->mutex);
J
draft  
jtao1735 已提交
169

J
jtao1735 已提交
170
  pObj->next = pContext->pHead;
J
jtao1735 已提交
171
  if (pContext->pHead) pContext->pHead->prev = pObj;
J
jtao1735 已提交
172
  pContext->pHead = pObj;
J
draft  
jtao1735 已提交
173

174
  cqCreateStream(pContext, pObj);
J
draft  
jtao1735 已提交
175

J
jtao1735 已提交
176
  pthread_mutex_unlock(&pContext->mutex);
J
jtao1735 已提交
177 178

  return pObj;
J
jtao1735 已提交
179 180
}

J
jtao1735 已提交
181 182 183
void cqDrop(void *handle) {
  SCqObj *pObj = handle;
  SCqContext *pContext = pObj->pContext;
J
jtao1735 已提交
184 185 186

  pthread_mutex_lock(&pContext->mutex);

J
jtao1735 已提交
187 188 189 190 191
  if (pObj->prev) {
    pObj->prev->next = pObj->next;
  } else {
    pContext->pHead = pObj->next;
  }
J
jtao1735 已提交
192

J
jtao1735 已提交
193 194
  if (pObj->next) {
    pObj->next->prev = pObj->prev;
J
draft  
jtao1735 已提交
195 196
  }

J
jtao1735 已提交
197 198 199
  // free the resources associated
  if (pObj->pStream) taos_close_stream(pObj->pStream);
  pObj->pStream = NULL;
J
draft  
jtao1735 已提交
200

J
jtao1735 已提交
201 202
  cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); 
  free(pObj);
J
draft  
jtao1735 已提交
203

J
jtao1735 已提交
204
  pthread_mutex_lock(&pContext->mutex);
J
draft  
jtao1735 已提交
205 206
}

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {

  if (pContext->dbConn == NULL) {
    pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
    if (pContext->dbConn == NULL) {
      cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
    }
    return;
  }

  int64_t lastKey = 0;
  pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
  if (pObj->pStream) {
    pContext->num++;
    cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
  } else {
    cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
  }
}

J
jtao1735 已提交
227 228 229 230
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
  SCqObj     *pObj = (SCqObj *)param;
  SCqContext *pContext = pObj->pContext;
  if (pObj->pStream == NULL) return;
J
draft  
jtao1735 已提交
231

J
jtao1735 已提交
232
  cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
J
jtao1735 已提交
233 234 235 236 237 238 239 240 241 242 243 244

  // construct data
  int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize;
  char *buffer = calloc(size, 1);

  SWalHead   *pHead = (SWalHead *)buffer;
  pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
  pHead->len = size - sizeof(SWalHead);
  
  SSubmitMsg *pSubmit = (SSubmitMsg *) (buffer + sizeof(SWalHead));
  // to do: fill in the SSubmitMsg structure
  pSubmit->numOfBlocks = 1;
J
draft  
jtao1735 已提交
245 246


J
jtao1735 已提交
247 248
  SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
  // to do: fill in the SSubmitBlk strucuture
J
jtao1735 已提交
249
  pBlk->tid = pObj->tid;
J
jtao1735 已提交
250 251 252 253 254 255


  // write into vnode write queue
  pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
}