tstream.c 13.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tstream.h"
#include "executor.h"

L
Liu Jicong 已提交
19 20 21 22 23 24 25 26 27 28 29 30
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
  SStreamTaskExecReq req = {
      .streamId = pTask->streamId,
      .data = data,
  };

  int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req);
  void*   buf = rpcMallocCont(tlen);

  if (buf == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32 33 34
  if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
    ((SMsgHead*)buf)->vgId = 0;
    req.taskId = pTask->inplaceDispatcher.taskId;
L
Liu Jicong 已提交
35

L
Liu Jicong 已提交
36 37 38 39
  } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
    ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
    *ppEpSet = &pTask->fixedEpDispatcher.epSet;
    req.taskId = pTask->fixedEpDispatcher.taskId;
L
Liu Jicong 已提交
40

L
Liu Jicong 已提交
41
  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
L
fix  
Liu Jicong 已提交
42
    // TODO use general name rule of schemaless
L
Liu Jicong 已提交
43
    char ctbName[TSDB_TABLE_FNAME_LEN + 22];
L
Liu Jicong 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56
    // all groupId must be the same in an array
    SSDataBlock* pBlock = taosArrayGet(data, 0);
    sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);

    // TODO: get hash function by hashMethod

    // get groupId, compute hash value
    uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
    //
    // get node
    // TODO: optimize search process
    SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
    int32_t sz = taosArrayGetSize(vgInfo);
L
Liu Jicong 已提交
57
    int32_t nodeId = 0;
L
Liu Jicong 已提交
58 59 60 61
    for (int32_t i = 0; i < sz; i++) {
      SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
      if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
        nodeId = pVgInfo->vgId;
L
Liu Jicong 已提交
62
        req.taskId = pVgInfo->taskId;
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
        *ppEpSet = &pVgInfo->epSet;
        break;
      }
    }
    ASSERT(nodeId != 0);
    ((SMsgHead*)buf)->vgId = htonl(nodeId);
  }

  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncodeSStreamTaskExecReq(&abuf, &req);

  pMsg->pCont = buf;
  pMsg->contLen = tlen;
  pMsg->code = 0;
  pMsg->msgType = pTask->dispatchMsgType;
dengyihao's avatar
dengyihao 已提交
78
  pMsg->noResp = 1;
L
Liu Jicong 已提交
79 80 81 82 83 84 85 86 87

  return 0;
}

static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) {
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(data, pIter);
    if (pIter == NULL) return 0;
L
Liu Jicong 已提交
88
    SArray* pData = *(SArray**)pIter;
L
Liu Jicong 已提交
89 90 91 92 93 94 95 96 97 98 99
    SRpcMsg dispatchMsg = {0};
    SEpSet* pEpSet;
    if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
      ASSERT(0);
      return -1;
    }
    tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
  }
  return 0;
}

L
Liu Jicong 已提交
100 101 102 103 104 105
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
  SArray* pRes = NULL;
  // source
  if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;

  // exec
L
Liu Jicong 已提交
106
  if (pTask->execType != TASK_EXEC__NONE) {
L
Liu Jicong 已提交
107 108 109
    ASSERT(workId < pTask->exec.numOfRunners);
    void* exec = pTask->exec.runners[workId].executor;
    pRes = taosArrayInit(0, sizeof(SSDataBlock));
L
Liu Jicong 已提交
110 111 112
    if (pRes == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
      qSetStreamInput(exec, input, inputType);
      while (1) {
        SSDataBlock* output;
        uint64_t     ts;
        if (qExecTask(exec, &output, &ts) < 0) {
          ASSERT(false);
        }
        if (output == NULL) {
          break;
        }
        taosArrayPush(pRes, output);
      }
    } else if (inputType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
      const SArray* blocks = (const SArray*)input;
L
Liu Jicong 已提交
128 129 130 131 132 133 134 135 136 137
      /*int32_t       sz = taosArrayGetSize(blocks);*/
      /*for (int32_t i = 0; i < sz; i++) {*/
      /*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/
      /*qSetStreamInput(exec, pBlock, inputType);*/
      qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
      while (1) {
        SSDataBlock* output;
        uint64_t     ts;
        if (qExecTask(exec, &output, &ts) < 0) {
          ASSERT(false);
L
Liu Jicong 已提交
138
        }
L
Liu Jicong 已提交
139 140 141 142
        if (output == NULL) {
          break;
        }
        taosArrayPush(pRes, output);
L
Liu Jicong 已提交
143
      }
L
Liu Jicong 已提交
144
      /*}*/
L
Liu Jicong 已提交
145 146 147 148 149 150 151 152 153 154
    } else {
      ASSERT(0);
    }
  } else {
    ASSERT(inputType == STREAM_DATA_TYPE_SSDATA_BLOCK);
    pRes = (SArray*)input;
  }

  // sink
  if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
155
    /*blockDebugShowData(pRes);*/
L
Liu Jicong 已提交
156
    ASSERT(pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
157
    SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid);
L
Liu Jicong 已提交
158
    tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
L
Liu Jicong 已提交
159
  } else if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
160
    pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
L
Liu Jicong 已提交
161 162 163 164 165 166 167 168 169
    //
  } else if (pTask->sinkType == TASK_SINK__FETCH) {
    //
  } else {
    ASSERT(pTask->sinkType == TASK_SINK__NONE);
  }

  // dispatch

L
Liu Jicong 已提交
170 171 172 173
  if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
    SRpcMsg dispatchMsg = {0};
    if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
174 175
      return -1;
    }
L
Liu Jicong 已提交
176 177 178 179 180 181 182 183 184

    int32_t qType;
    if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
      qType = FETCH_QUEUE;
    } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
               pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
      qType = MERGE_QUEUE;
    } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
      qType = WRITE_QUEUE;
L
Liu Jicong 已提交
185 186 187
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
188 189 190
    tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);

  } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
L
Liu Jicong 已提交
191 192 193 194
    SRpcMsg dispatchMsg = {0};
    SEpSet* pEpSet = NULL;
    if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
195 196 197 198 199 200
      return -1;
    }

    tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);

  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
L
Liu Jicong 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
    if (pShuffleRes == NULL) {
      return -1;
    }

    int32_t sz = taosArrayGetSize(pRes);
    for (int32_t i = 0; i < sz; i++) {
      SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
      SArray*      pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
      if (pArray == NULL) {
        pArray = taosArrayInit(0, sizeof(SSDataBlock));
        if (pArray == NULL) {
          return -1;
        }
        taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
      }
      taosArrayPush(pArray, pDataBlock);
    }

    if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
      return -1;
    }
L
Liu Jicong 已提交
223 224 225

  } else {
    ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
L
Liu Jicong 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
  }
  return 0;
}

int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pReq->streamId);
  tlen += taosEncodeFixedI32(buf, pReq->taskId);
  tlen += tEncodeDataBlocks(buf, pReq->data);
  return tlen;
}

void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) {
  buf = taosDecodeFixedI64(buf, &pReq->streamId);
  buf = taosDecodeFixedI32(buf, &pReq->taskId);
  buf = tDecodeDataBlocks(buf, &pReq->data);
  return (void*)buf;
}

void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); }
L
Liu Jicong 已提交
246 247

SStreamTask* tNewSStreamTask(int64_t streamId) {
wafwerar's avatar
wafwerar 已提交
248
  SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
L
Liu Jicong 已提交
249 250 251 252 253 254 255 256 257 258
  if (pTask == NULL) {
    return NULL;
  }
  pTask->taskId = tGenIdPI32();
  pTask->streamId = streamId;
  pTask->status = STREAM_TASK_STATUS__RUNNING;
  /*pTask->qmsg = NULL;*/
  return pTask;
}

H
Hongze Cheng 已提交
259
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
L
Liu Jicong 已提交
260 261 262 263 264 265 266 267 268 269
  /*if (tStartEncode(pEncoder) < 0) return -1;*/
  if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
  if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
  if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;

L
Liu Jicong 已提交
270 271 272 273
  if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
  if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;

  if (pTask->execType != TASK_EXEC__NONE) {
L
Liu Jicong 已提交
274 275 276 277
    if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1;
    if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
  }

L
Liu Jicong 已提交
278
  if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
279
    if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
L
Liu Jicong 已提交
280
    if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
L
Liu Jicong 已提交
281
  } else if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
282
    if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
L
Liu Jicong 已提交
283 284 285 286
  } else if (pTask->sinkType == TASK_SINK__FETCH) {
    if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
  } else {
    ASSERT(pTask->sinkType == TASK_SINK__NONE);
L
Liu Jicong 已提交
287 288 289
  }

  if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
L
Liu Jicong 已提交
290
    if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
291
  } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
L
Liu Jicong 已提交
292
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
293 294 295
    if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
L
Liu Jicong 已提交
296 297
    if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
    /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
L
Liu Jicong 已提交
298 299 300 301 302 303
  }

  /*tEndEncode(pEncoder);*/
  return pEncoder->pos;
}

H
Hongze Cheng 已提交
304
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
L
Liu Jicong 已提交
305 306 307 308 309 310 311 312 313 314
  /*if (tStartDecode(pDecoder) < 0) return -1;*/
  if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
  if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
  if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;

L
Liu Jicong 已提交
315 316 317 318
  if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
  if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;

  if (pTask->execType != TASK_EXEC__NONE) {
L
Liu Jicong 已提交
319 320 321 322
    if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1;
    if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
  }

L
Liu Jicong 已提交
323
  if (pTask->sinkType == TASK_SINK__TABLE) {
L
Liu Jicong 已提交
324
    if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
L
Liu Jicong 已提交
325 326 327
    pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
    if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
    if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
L
Liu Jicong 已提交
328
  } else if (pTask->sinkType == TASK_SINK__SMA) {
L
Liu Jicong 已提交
329
    if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
L
Liu Jicong 已提交
330 331 332 333
  } else if (pTask->sinkType == TASK_SINK__FETCH) {
    if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
  } else {
    ASSERT(pTask->sinkType == TASK_SINK__NONE);
L
Liu Jicong 已提交
334 335 336
  }

  if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
L
Liu Jicong 已提交
337
    if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
338
  } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
L
Liu Jicong 已提交
339
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
L
Liu Jicong 已提交
340 341 342
    if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
    if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
  } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
L
Liu Jicong 已提交
343 344
    /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
    if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
L
Liu Jicong 已提交
345 346 347 348 349 350 351 352
  }

  /*tEndDecode(pDecoder);*/
  return 0;
}

void tFreeSStreamTask(SStreamTask* pTask) {
  // TODO
wafwerar's avatar
wafwerar 已提交
353 354 355
  /*taosMemoryFree(pTask->qmsg);*/
  /*taosMemoryFree(pTask->executor);*/
  /*taosMemoryFree(pTask);*/
L
Liu Jicong 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
}

#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
  if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
  if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
  /*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
  return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
  return pEncoder->size;
}
void    tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
  taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif