cqMain.c 13.4 KB
Newer Older
J
draft  
jtao1735 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
J
jtao1735 已提交
17

H
TD-354  
Hongze Cheng 已提交
18 19
#include <errno.h>
#include <pthread.h>
J
jtao1735 已提交
20 21
#include <stdlib.h>
#include <string.h>
H
TD-354  
Hongze Cheng 已提交
22 23

#include "taos.h"
24
#include "tsclient.h"
J
jtao1735 已提交
25
#include "taosdef.h"
J
draft  
jtao1735 已提交
26
#include "taosmsg.h"
B
Bomin Zhang 已提交
27
#include "ttimer.h"
H
TD-354  
Hongze Cheng 已提交
28 29
#include "tcq.h"
#include "tdataformat.h"
J
jtao1735 已提交
30
#include "tglobal.h"
J
jtao1735 已提交
31 32 33
#include "tlog.h"
#include "twal.h"

S
TD-1520  
Shengliang Guan 已提交
34 35 36 37 38
#define cFatal(...) { if (cqDebugFlag & DEBUG_FATAL) { taosPrintLog("CQ  FATAL ", 255, __VA_ARGS__); }}
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("CQ  ERROR ", 255, __VA_ARGS__); }}
#define cWarn(...)  { if (cqDebugFlag & DEBUG_WARN)  { taosPrintLog("CQ  WARN ", 255, __VA_ARGS__); }}
#define cInfo(...)  { if (cqDebugFlag & DEBUG_INFO)  { taosPrintLog("CQ  ", 255, __VA_ARGS__); }}
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ  ", cqDebugFlag, __VA_ARGS__); }}
S
Shengliang Guan 已提交
39
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ  ", cqDebugFlag, __VA_ARGS__); }}
J
jtao1735 已提交
40 41

typedef struct {
S
Shengliang Guan 已提交
42
  int32_t  vgId;
S
TD-2283  
Shengliang Guan 已提交
43 44
  int32_t  master;
  int32_t  num;      // number of continuous streams
J
jtao1735 已提交
45
  char     user[TSDB_USER_LEN];
46
  char     pass[TSDB_KEY_LEN];
B
Bomin Zhang 已提交
47
  char     db[TSDB_DB_NAME_LEN];
J
jtao1735 已提交
48 49 50
  FCqWrite cqWrite;
  struct SCqObj *pHead;
  void    *dbConn;
B
Bomin Zhang 已提交
51
  void    *tmrCtrl;
J
jtao1735 已提交
52
  pthread_mutex_t mutex;
D
fix bug  
dapan1121 已提交
53 54
  int32_t delete;
  int32_t cqObjNum;
J
jtao1735 已提交
55 56 57
} SCqContext;

typedef struct SCqObj {
B
Bomin Zhang 已提交
58
  tmr_h          tmrId;
D
fix bug  
dapan1121 已提交
59
  int64_t        rid;
B
Bomin Zhang 已提交
60 61
  uint64_t       uid;
  int32_t        tid;      // table ID
S
Shengliang Guan 已提交
62
  int32_t        rowSize;  // bytes of a row
63
  char *         dstTable;
H
TD-354  
Hongze Cheng 已提交
64 65 66 67 68 69
  char *         sqlStr;   // SQL string
  STSchema *     pSchema;  // pointer to schema array
  void *         pStream;
  struct SCqObj *prev;
  struct SCqObj *next;
  SCqContext *   pContext;
J
jtao1735 已提交
70 71 72
} SCqObj;

static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); 
73
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
J
jtao1735 已提交
74

D
fix bug  
dapan1121 已提交
75
int32_t    cqObjRef = -1;
D
fix bug  
dapan1121 已提交
76
int32_t    cqVnodeNum = 0;
D
fix bug  
dapan1121 已提交
77

D
fix bug  
dapan1121 已提交
78 79
void cqRmFromList(SCqObj *pObj) {
  //LOCK in caller
D
fix bug  
dapan1121 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92

  SCqContext *pContext = pObj->pContext;

  if (pObj->prev) {
    pObj->prev->next = pObj->next;
  } else {
    pContext->pHead = pObj->next;
  }

  if (pObj->next) {
    pObj->next->prev = pObj->prev;
  }

D
fix bug  
dapan1121 已提交
93 94
}

D
fix bug  
dapan1121 已提交
95 96 97 98 99 100
static void freeSCqContext(void *handle) {
  if (handle == NULL) {
    return;
  }
  SCqContext *pContext = handle;
  pthread_mutex_destroy(&pContext->mutex);
H
Haojun Liao 已提交
101

D
fix bug  
dapan1121 已提交
102 103 104 105 106 107 108
  taosTmrCleanUp(pContext->tmrCtrl);
  pContext->tmrCtrl = NULL;
  cDebug("vgId:%d, CQ is closed", pContext->vgId);
  free(pContext);
}


D
fix bug  
dapan1121 已提交
109 110 111
void cqFree(void *handle) {
  if (tsEnableStream == 0) {
    return;
D
fix bug  
dapan1121 已提交
112
  }
D
fix bug  
dapan1121 已提交
113 114 115
  SCqObj *pObj = handle;
  SCqContext *pContext = pObj->pContext;
  int32_t delete = 0;
D
fix bug  
dapan1121 已提交
116

D
fix bug  
dapan1121 已提交
117 118
  pthread_mutex_lock(&pContext->mutex);
  
D
fix bug  
dapan1121 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
  // free the resources associated
  if (pObj->pStream) {
    taos_close_stream(pObj->pStream);
    pObj->pStream = NULL;
  } else {
    taosTmrStop(pObj->tmrId);
    pObj->tmrId = 0;
  }

  cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); 
  tdFreeSchema(pObj->pSchema);
  free(pObj->dstTable);
  free(pObj->sqlStr);
  free(pObj);

D
fix bug  
dapan1121 已提交
134 135 136 137 138 139
  pContext->cqObjNum--;

  if (pContext->cqObjNum <= 0 && pContext->delete) {
    delete = 1;
  }

D
fix bug  
dapan1121 已提交
140 141
  pthread_mutex_unlock(&pContext->mutex);

D
fix bug  
dapan1121 已提交
142
  if (delete) {
D
fix bug  
dapan1121 已提交
143
    freeSCqContext(pContext);
D
fix bug  
dapan1121 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
  }
}


void cqCreateRef() {
  int32_t ref = atomic_load_32(&cqObjRef);
  if (ref == -1) {
    ref = taosOpenRef(4096, cqFree);

    if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) {
      taosCloseRef(ref);
    }  
  }
}


J
jtao1735 已提交
160
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
161 162 163
  if (tsEnableStream == 0) {
    return NULL;
  }
J
jtao1735 已提交
164
  SCqContext *pContext = calloc(sizeof(SCqContext), 1);
165 166 167 168
  if (pContext == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }
J
jtao1735 已提交
169

D
fix bug  
dapan1121 已提交
170 171
  atomic_add_fetch_32(&cqVnodeNum, 1);
  
D
fix bug  
dapan1121 已提交
172 173
  cqCreateRef();

B
Bomin Zhang 已提交
174 175
  pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");

B
Bomin Zhang 已提交
176 177
  tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
  tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass));
B
Bomin Zhang 已提交
178 179 180 181 182 183 184
  const char* db = pCfg->db;
  for (const char* p = db; *p != 0; p++) {
    if (*p == '.') {
      db = p + 1;
      break;
    }
  }
B
Bomin Zhang 已提交
185
  tstrncpy(pContext->db, db, sizeof(pContext->db));
J
jtao1735 已提交
186 187
  pContext->vgId = pCfg->vgId;
  pContext->cqWrite = pCfg->cqWrite;
188
  tscEmbedded = 1;
J
jtao1735 已提交
189 190

  pthread_mutex_init(&pContext->mutex, NULL);
J
draft  
jtao1735 已提交
191

D
fix bug  
dapan1121 已提交
192

Z
zyyang 已提交
193
  cDebug("vgId:%d, CQ is opened", pContext->vgId);
J
draft  
jtao1735 已提交
194

J
jtao1735 已提交
195 196
  return pContext;
}
J
draft  
jtao1735 已提交
197

H
Haojun Liao 已提交
198

J
jtao1735 已提交
199
void cqClose(void *handle) {
200 201 202
  if (tsEnableStream == 0) {
    return;
  }
J
jtao1735 已提交
203
  SCqContext *pContext = handle;
204
  if (handle == NULL) return;
J
draft  
jtao1735 已提交
205

D
fix bug  
dapan1121 已提交
206
  pContext->delete = 1;
D
fix bug  
dapan1121 已提交
207 208
  int32_t hasCq = 0;
  int32_t existLoop = 0;
H
Haojun Liao 已提交
209

J
jtao1735 已提交
210 211
  // stop all CQs
  cqStop(pContext);
J
draft  
jtao1735 已提交
212

D
fix bug  
dapan1121 已提交
213
  int64_t rid = 0;
214

D
fix bug  
dapan1121 已提交
215 216 217 218 219 220
  while (1) {
    pthread_mutex_lock(&pContext->mutex);

    SCqObj *pObj = pContext->pHead;
    if (pObj) {
      cqRmFromList(pObj);
J
jtao1735 已提交
221

D
fix bug  
dapan1121 已提交
222
      rid = pObj->rid;
D
fix bug  
dapan1121 已提交
223 224 225 226 227 228

      hasCq = 1;

      if (pContext->pHead == NULL) {
        existLoop = 1;
      }
H
Haojun Liao 已提交
229
    } else {
D
fix bug  
dapan1121 已提交
230 231 232 233 234 235
      pthread_mutex_unlock(&pContext->mutex);
      break;
    }
    
    pthread_mutex_unlock(&pContext->mutex);
    
D
fix bug  
dapan1121 已提交
236
    taosRemoveRef(cqObjRef, rid);
D
fix bug  
dapan1121 已提交
237 238 239 240

    if (existLoop) {
      break;
    }
D
fix bug  
dapan1121 已提交
241
  }
242

D
fix bug  
dapan1121 已提交
243 244 245
  if (hasCq == 0) {
    freeSCqContext(pContext);
  }
D
fix bug  
dapan1121 已提交
246 247 248 249 250 251 252

  int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1);
  if (remainn <= 0) {
    int32_t ref = cqObjRef;
    cqObjRef = -1;    
    taosCloseRef(ref);
  }
J
draft  
jtao1735 已提交
253 254
}

J
jtao1735 已提交
255
void cqStart(void *handle) {
256 257 258
  if (tsEnableStream == 0) {
    return;
  }
J
jtao1735 已提交
259
  SCqContext *pContext = handle;
260
  if (pContext->dbConn || pContext->master) return;
J
jtao1735 已提交
261

Z
zyyang 已提交
262
  cDebug("vgId:%d, start all CQs", pContext->vgId);
J
jtao1735 已提交
263 264
  pthread_mutex_lock(&pContext->mutex);

265
  pContext->master = 1;
J
jtao1735 已提交
266 267 268

  SCqObj *pObj = pContext->pHead;
  while (pObj) {
269
    cqCreateStream(pContext, pObj);
J
jtao1735 已提交
270
    pObj = pObj->next;
J
draft  
jtao1735 已提交
271
  }
J
jtao1735 已提交
272 273

  pthread_mutex_unlock(&pContext->mutex);
J
draft  
jtao1735 已提交
274 275
}

J
jtao1735 已提交
276
void cqStop(void *handle) {
277 278 279
  if (tsEnableStream == 0) {
    return;
  }
H
Haojun Liao 已提交
280

J
jtao1735 已提交
281
  SCqContext *pContext = handle;
S
TD-2321  
Shengliang Guan 已提交
282
  cDebug("vgId:%d, stop all CQs", pContext->vgId);
283
  if (pContext->dbConn == NULL || pContext->master == 0) return;
J
draft  
jtao1735 已提交
284

J
jtao1735 已提交
285
  pthread_mutex_lock(&pContext->mutex);
J
draft  
jtao1735 已提交
286

287
  pContext->master = 0;
J
jtao1735 已提交
288 289
  SCqObj *pObj = pContext->pHead;
  while (pObj) {
J
jtao1735 已提交
290 291 292
    if (pObj->pStream) {
      taos_close_stream(pObj->pStream);
      pObj->pStream = NULL;
S
TD-1520  
Shengliang Guan 已提交
293
      cInfo("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
B
Bomin Zhang 已提交
294 295 296
    } else {
      taosTmrStop(pObj->tmrId);
      pObj->tmrId = 0;
J
jtao1735 已提交
297
    }
J
jtao1735 已提交
298
    pObj = pObj->next;
J
draft  
jtao1735 已提交
299
  }
J
jtao1735 已提交
300 301 302 303 304

  if (pContext->dbConn) taos_close(pContext->dbConn);
  pContext->dbConn = NULL;

  pthread_mutex_unlock(&pContext->mutex);
J
draft  
jtao1735 已提交
305 306
}

R
fix bug  
root 已提交
307
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) {
308 309 310
  if (tsEnableStream == 0) {
    return NULL;
  }
J
jtao1735 已提交
311
  SCqContext *pContext = handle;
D
fix bug  
dapan1121 已提交
312
  int64_t rid = 0;
313 314 315 316 317 318 319 320 321 322 323 324 325

  pthread_mutex_lock(&pContext->mutex);

  SCqObj *pObj = pContext->pHead;
  while (pObj) {
    if (pObj->uid == uid) {
      rid = pObj->rid;
      pthread_mutex_unlock(&pContext->mutex);
      return (void *)rid;
    }
    
    pObj = pObj->next;
  }
D
fix bug  
dapan1121 已提交
326
  
327 328 329
  pthread_mutex_unlock(&pContext->mutex);
  
  pObj = calloc(sizeof(SCqObj), 1);
J
jtao1735 已提交
330
  if (pObj == NULL) return NULL;
J
draft  
jtao1735 已提交
331

B
Bomin Zhang 已提交
332
  pObj->uid = uid;
333 334 335 336 337
  pObj->tid = sid;
  if (dstTable != NULL) {
    pObj->dstTable = strdup(dstTable);
  }
  pObj->sqlStr = strdup(sqlStr);
J
draft  
jtao1735 已提交
338

H
TD-354  
Hongze Cheng 已提交
339
  pObj->pSchema = tdDupSchema(pSchema);
B
Bomin Zhang 已提交
340
  pObj->rowSize = schemaTLen(pSchema);
J
draft  
jtao1735 已提交
341

S
TD-1520  
Shengliang Guan 已提交
342
  cInfo("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
J
draft  
jtao1735 已提交
343

J
jtao1735 已提交
344
  pthread_mutex_lock(&pContext->mutex);
J
draft  
jtao1735 已提交
345

J
jtao1735 已提交
346
  pObj->next = pContext->pHead;
J
jtao1735 已提交
347
  if (pContext->pHead) pContext->pHead->prev = pObj;
J
jtao1735 已提交
348
  pContext->pHead = pObj;
J
draft  
jtao1735 已提交
349

D
fix bug  
dapan1121 已提交
350 351
  pContext->cqObjNum++;

D
fix bug  
dapan1121 已提交
352 353
  pObj->rid = taosAddRef(cqObjRef, pObj);

D
dapan1121 已提交
354
  if(start && pContext->master) {
R
fix bug  
root 已提交
355
    cqCreateStream(pContext, pObj);
D
dapan1121 已提交
356 357
  } else {
    pObj->pContext = pContext;
R
fix bug  
root 已提交
358
  }
J
draft  
jtao1735 已提交
359

D
fix bug  
dapan1121 已提交
360 361
  rid = pObj->rid;

J
jtao1735 已提交
362
  pthread_mutex_unlock(&pContext->mutex);
J
jtao1735 已提交
363

D
fix bug  
dapan1121 已提交
364

D
fix bug  
dapan1121 已提交
365
  return (void *)rid;
J
jtao1735 已提交
366 367
}

J
jtao1735 已提交
368
void cqDrop(void *handle) {
369 370 371
  if (tsEnableStream == 0) {
    return;
  }
J
jtao1735 已提交
372

D
fix bug  
dapan1121 已提交
373 374 375 376 377 378 379
  SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle);
  if (pObj == NULL) {
    return;
  }
  
  SCqContext *pContext = pObj->pContext;
  
J
jtao1735 已提交
380 381
  pthread_mutex_lock(&pContext->mutex);

D
fix bug  
dapan1121 已提交
382 383
  cqRmFromList(pObj);
  
J
jtao1735 已提交
384
  // free the resources associated
B
Bomin Zhang 已提交
385 386 387 388 389 390 391
  if (pObj->pStream) {
    taos_close_stream(pObj->pStream);
    pObj->pStream = NULL;
  } else {
    taosTmrStop(pObj->tmrId);
    pObj->tmrId = 0;
  }
J
draft  
jtao1735 已提交
392

B
Bomin Zhang 已提交
393
  pthread_mutex_unlock(&pContext->mutex);
D
fix bug  
dapan1121 已提交
394

D
fix bug  
dapan1121 已提交
395 396
  taosRemoveRef(cqObjRef, (int64_t)handle);
  taosReleaseRef(cqObjRef, (int64_t)handle);
J
draft  
jtao1735 已提交
397 398
}

S
Shengliang Guan 已提交
399
static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
D
fix bug  
dapan1121 已提交
400 401 402 403
  SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
  if (pObj == NULL) {
    return;
  }
404

405
  SCqContext* pContext = pObj->pContext;
406 407 408 409 410
  SSqlObj* pSql = (SSqlObj*)result;  
  if (code == TSDB_CODE_SUCCESS) {
    if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
      taos_close(pSql->pTscObj);
    }
411
  }
412
  
413
  pthread_mutex_lock(&pContext->mutex);
414
  cqCreateStream(pContext, pObj);
415
  pthread_mutex_unlock(&pContext->mutex);
D
fix bug  
dapan1121 已提交
416 417

  taosReleaseRef(cqObjRef, (int64_t)param);
418 419
}

B
Bomin Zhang 已提交
420
static void cqProcessCreateTimer(void *param, void *tmrId) {
D
fix bug  
dapan1121 已提交
421 422 423 424 425
  SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
  if (pObj == NULL) {
    return;
  }
  
B
Bomin Zhang 已提交
426 427
  SCqContext* pContext = pObj->pContext;

428
  if (pContext->dbConn == NULL) {
S
TD-1520  
Shengliang Guan 已提交
429
    cDebug("vgId:%d, try connect to TDengine", pContext->vgId);
430 431
    taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
  } else {
432
    pthread_mutex_lock(&pContext->mutex);
433
    cqCreateStream(pContext, pObj);
434
    pthread_mutex_unlock(&pContext->mutex);
435
  }
D
fix bug  
dapan1121 已提交
436 437

  taosReleaseRef(cqObjRef, (int64_t)param);
B
Bomin Zhang 已提交
438
}
439

440 441 442 443
// inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
                              int64_t stime, void *param, void (*callback)(void *));

B
Bomin Zhang 已提交
444
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
B
Bomin Zhang 已提交
445
  pObj->pContext = pContext;
B
Bomin Zhang 已提交
446 447

  if (pContext->dbConn == NULL) {
S
TD-1520  
Shengliang Guan 已提交
448
    cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId);
D
fix bug  
dapan1121 已提交
449
    pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl);
B
Bomin Zhang 已提交
450 451
    return;
  }
452

B
Bomin Zhang 已提交
453 454
  pObj->tmrId = 0;

455
  if (pObj->pStream == NULL) {
456
    pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
457 458

    // TODO the pObj->pStream may be released if error happens
459 460
    if (pObj->pStream) {
      pContext->num++;
S
TD-1843  
Shengliang Guan 已提交
461
      cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
462 463 464
    } else {
      cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
    }
465 466 467
  }
}

J
jtao1735 已提交
468
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
D
fix bug  
dapan1121 已提交
469 470 471 472 473
  SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
  if (pObj == NULL) {
    return;
  }
  
B
Bomin Zhang 已提交
474
  if (tres == NULL && row == NULL) {
475 476
    taos_close_stream(pObj->pStream);

B
Bomin Zhang 已提交
477
    pObj->pStream = NULL;
D
fix bug  
dapan1121 已提交
478 479 480

    taosReleaseRef(cqObjRef, (int64_t)param);

B
Bomin Zhang 已提交
481 482
    return;
  }
483

J
jtao1735 已提交
484
  SCqContext *pContext = pObj->pContext;
B
Bomin Zhang 已提交
485
  STSchema   *pSchema = pObj->pSchema;
D
fix bug  
dapan1121 已提交
486 487 488 489 490
  if (pObj->pStream == NULL) {    
    taosReleaseRef(cqObjRef, (int64_t)param);
    return;
  }
  
S
TD-1520  
Shengliang Guan 已提交
491
  cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
J
jtao1735 已提交
492

S
Shengliang Guan 已提交
493
  int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
J
jtao1735 已提交
494 495 496
  char *buffer = calloc(size, 1);

  SWalHead   *pHead = (SWalHead *)buffer;
B
Bomin Zhang 已提交
497 498
  SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
  SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
J
draft  
jtao1735 已提交
499

B
Bomin Zhang 已提交
500
  SDataRow trow = (SDataRow)pBlk->data;
501
  tdInitDataRow(trow, pSchema);
J
jtao1735 已提交
502

B
Bomin Zhang 已提交
503
  for (int32_t i = 0; i < pSchema->numOfCols; i++) {
504
    STColumn *c = pSchema->columns + i;
505 506 507
    void* val = row[i];
    if (val == NULL) {
      val = getNullValue(c->type);
B
Bomin Zhang 已提交
508
    } else if (c->type == TSDB_DATA_TYPE_BINARY) {
509
      val = ((char*)val) - sizeof(VarDataLenT);
B
Bomin Zhang 已提交
510 511
    } else if (c->type == TSDB_DATA_TYPE_NCHAR) {
      char buf[TSDB_MAX_NCHAR_LEN];
512
      int32_t len = taos_fetch_lengths(tres)[i];
B
Bomin Zhang 已提交
513
      taosMbsToUcs4(val, len, buf, sizeof(buf), &len);
S
TD-1207  
Shengliang Guan 已提交
514
      memcpy((char *)val + sizeof(VarDataLenT), buf, len);
B
Bomin Zhang 已提交
515
      varDataLen(val) = len;
516 517
    }
    tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
B
Bomin Zhang 已提交
518
  }
H
Haojun Liao 已提交
519 520
  pBlk->dataLen = htonl(dataRowLen(trow));
  pBlk->schemaLen = 0;
B
Bomin Zhang 已提交
521 522 523 524 525 526 527

  pBlk->uid = htobe64(pObj->uid);
  pBlk->tid = htonl(pObj->tid);
  pBlk->numOfRows = htons(1);
  pBlk->sversion = htonl(pSchema->version);
  pBlk->padding = 0;

B
Bomin Zhang 已提交
528 529
  pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow);

B
Bomin Zhang 已提交
530
  pMsg->header.vgId = htonl(pContext->vgId);
B
Bomin Zhang 已提交
531
  pMsg->header.contLen = htonl(pHead->len);
B
Bomin Zhang 已提交
532 533 534 535 536
  pMsg->length = pMsg->header.contLen;
  pMsg->numOfBlocks = htonl(1);

  pHead->msgType = TSDB_MSG_TYPE_SUBMIT;
  pHead->version = 0;
J
jtao1735 已提交
537 538

  // write into vnode write queue
S
TD-2283  
Shengliang Guan 已提交
539
  pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL);
B
Bomin Zhang 已提交
540
  free(buffer);
D
fix bug  
dapan1121 已提交
541 542
  
  taosReleaseRef(cqObjRef, (int64_t)param);
J
jtao1735 已提交
543 544
}