streamData.c 6.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18 19 20 21 22 23 24 25 26 27 28 29
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
  int32_t blockNum = pReq->blockNum;
  SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
  }
  taosArraySetSize(pArray, blockNum);

  ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
  ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));

  for (int32_t i = 0; i < blockNum; i++) {
30
    /*int32_t            len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/
L
Liu Jicong 已提交
31 32
    SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
    SSDataBlock*       pDataBlock = taosArrayGet(pArray, i);
33
    blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
L
Liu Jicong 已提交
34
    // TODO: refactor
35 36 37
    pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
    pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);

L
Liu Jicong 已提交
38
    pDataBlock->info.type = pRetrieve->streamBlockType;
L
Liu Jicong 已提交
39 40 41 42 43 44 45 46 47 48
    pDataBlock->info.childId = pReq->upstreamChildId;
  }
  pData->blocks = pArray;
  return 0;
}

int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
  SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
L
Liu Jicong 已提交
49
  }
L
Liu Jicong 已提交
50 51
  taosArraySetSize(pArray, 1);
  SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
52
  SSDataBlock*       pDataBlock = taosArrayGet(pArray, 0);
53
  blockDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
54 55 56 57 58 59
  // TODO: refactor
  pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
  pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);

  pDataBlock->info.type = pRetrieve->streamBlockType;

L
Liu Jicong 已提交
60
  pData->reqId = pReq->reqId;
L
Liu Jicong 已提交
61
  pData->blocks = pArray;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64 65
  return 0;
}

L
Liu Jicong 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
  SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
  if (pDataSubmit == NULL) return NULL;
  pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
  if (pDataSubmit->dataRef == NULL) goto FAIL;
  pDataSubmit->data = pReq;
  *pDataSubmit->dataRef = 1;
  pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT;
  return pDataSubmit;
FAIL:
  taosFreeQitem(pDataSubmit);
  return NULL;
}

80 81 82 83
SStreamMergedSubmit* streamMergedSubmitNew() {
  SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
  if (pMerged == NULL) return NULL;
  pMerged->reqs = taosArrayInit(0, sizeof(void*));
84
  pMerged->dataRefs = taosArrayInit(0, sizeof(int32_t*));
85
  if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
J
jiacy-jcy 已提交
86
  pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
87 88 89 90 91 92 93 94 95 96
  return pMerged;
FAIL:
  if (pMerged->reqs) taosArrayDestroy(pMerged->reqs);
  if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
  taosFreeQitem(pMerged);
  return NULL;
}

int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
  taosArrayPush(pMerged->dataRefs, pSubmit->dataRef);
97
  taosArrayPush(pMerged->reqs, &pSubmit->data);
98 99 100 101
  pMerged->ver = pSubmit->ver;
  return 0;
}

L
Liu Jicong 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
  atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}

SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
  SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
  if (pSubmitClone == NULL) {
    return NULL;
  }
  streamDataSubmitRefInc(pSubmit);
  memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
  return pSubmitClone;
}
L
Liu Jicong 已提交
115 116 117 118 119 120 121 122 123

void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
  int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
  ASSERT(ref >= 0);
  if (ref == 0) {
    taosMemoryFree(pDataSubmit->data);
    taosMemoryFree(pDataSubmit->dataRef);
  }
}
L
Liu Jicong 已提交
124

J
jiacy-jcy 已提交
125
SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
L
Liu Jicong 已提交
126
  ASSERT(elem);
127
  if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
L
Liu Jicong 已提交
128 129 130
    SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
    SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem;
    taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
131 132
    taosArrayDestroy(pBlockSrc->blocks);
    taosFreeQitem(elem);
J
jiacy-jcy 已提交
133
    return dst;
134 135 136 137 138
  } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
    SStreamDataSubmit*   pBlockSrc = (SStreamDataSubmit*)elem;
    streamMergeSubmit(pMerged, pBlockSrc);
    taosFreeQitem(elem);
J
jiacy-jcy 已提交
139
    return dst;
140 141 142 143 144 145 146
  } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
    ASSERT(pMerged);
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem);
    taosFreeQitem(dst);
    taosFreeQitem(elem);
J
jiacy-jcy 已提交
147
    return (SStreamQueueItem*)pMerged;
L
Liu Jicong 已提交
148
  } else {
J
jiacy-jcy 已提交
149
    return NULL;
L
Liu Jicong 已提交
150 151 152 153 154
  }
}

void streamFreeQitem(SStreamQueueItem* data) {
  int8_t type = data->type;
L
Liu Jicong 已提交
155
  if (type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
156 157 158
    blockDataDestroy(((SStreamTrigger*)data)->pBlock);
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
159
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
160 161 162 163
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_SUBMIT) {
    streamDataSubmitRefDec((SStreamDataSubmit*)data);
    taosFreeQitem(data);
164 165 166 167 168 169 170
  } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
    int32_t              sz = taosArrayGetSize(pMerge->reqs);
    for (int32_t i = 0; i < sz; i++) {
      int32_t* ref = taosArrayGet(pMerge->dataRefs, i);
      (*ref)--;
      if (*ref == 0) {
171
        void* data = taosArrayGetP(pMerge->reqs, i);
172 173 174 175 176 177 178
        taosMemoryFree(data);
        taosMemoryFree(ref);
      }
    }
    taosArrayDestroy(pMerge->reqs);
    taosArrayDestroy(pMerge->dataRefs);
    taosFreeQitem(pMerge);
L
Liu Jicong 已提交
179 180
  }
}