streamData.c 7.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
L
Liu Jicong 已提交
17

18 19 20 21 22 23 24 25 26
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
  if (pData == NULL) {
    return NULL;
  }

  pData->type = blockType;
  pData->srcVgId = srcVg;

L
Liu Jicong 已提交
27
  int32_t blockNum = pReq->blockNum;
28
  SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
L
Liu Jicong 已提交
29
  if (pArray == NULL) {
30
    return NULL;
L
Liu Jicong 已提交
31 32
  }

33
  ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
L
Liu Jicong 已提交
34 35

  for (int32_t i = 0; i < blockNum; i++) {
5
54liuyao 已提交
36
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
L
Liu Jicong 已提交
37
    SSDataBlock*       pDataBlock = taosArrayGet(pArray, i);
38
    blockDecode(pDataBlock, pRetrieve->data);
39

L
Liu Jicong 已提交
40
    // TODO: refactor
41 42
    pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
    pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
43
    pDataBlock->info.version = be64toh(pRetrieve->version);
44
    pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
45
    memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
46

L
Liu Jicong 已提交
47
    pDataBlock->info.type = pRetrieve->streamBlockType;
L
Liu Jicong 已提交
48 49
    pDataBlock->info.childId = pReq->upstreamChildId;
  }
50

L
Liu Jicong 已提交
51
  pData->blocks = pArray;
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  return pData;
}

SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
67
    pStreamBlocks->childId = pTask->info.selfChildId;
68 69 70
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
71
    pStreamBlocks->childId = pTask->info.selfChildId;
72 73 74 75 76 77 78 79 80 81 82 83 84
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
  if (pBlock == NULL) {
    return;
  }

  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
L
Liu Jicong 已提交
85 86 87 88 89 90
}

int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
  SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
L
Liu Jicong 已提交
91
  }
92

H
Haojun Liao 已提交
93
  taosArrayPush(pArray, &(SSDataBlock){0});
L
Liu Jicong 已提交
94
  SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
95
  SSDataBlock*       pDataBlock = taosArrayGet(pArray, 0);
96
  blockDecode(pDataBlock, pRetrieve->data);
97

98 99 100
  // TODO: refactor
  pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
  pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
101
  pDataBlock->info.version = be64toh(pRetrieve->version);
102 103 104

  pDataBlock->info.type = pRetrieve->streamBlockType;

L
Liu Jicong 已提交
105
  pData->reqId = pReq->reqId;
L
Liu Jicong 已提交
106
  pData->blocks = pArray;
L
Liu Jicong 已提交
107

L
Liu Jicong 已提交
108 109 110
  return 0;
}

111 112
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
  SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
113 114 115 116
  if (pDataSubmit == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
117
  pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
118 119 120 121 122
  if (pDataSubmit->dataRef == NULL) {
    taosFreeQitem(pDataSubmit);
    return NULL;
  }

123
  pDataSubmit->submit = *pData;
124
  *pDataSubmit->dataRef = 1;   // initialize the reference count to be 1
125
  pDataSubmit->type = type;
126

L
Liu Jicong 已提交
127 128 129
  return pDataSubmit;
}

130
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
131 132 133 134 135 136 137 138 139
  int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
  ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);

  if (ref == 0) {
    taosMemoryFree(pDataSubmit->submit.msgStr);
    taosMemoryFree(pDataSubmit->dataRef);
  }
}

140 141
SStreamMergedSubmit* streamMergedSubmitNew() {
  SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
142 143 144
  if (pMerged == NULL) {
    return NULL;
  }
145

L
Liu Jicong 已提交
146
  pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
147
  pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
148 149 150 151 152 153 154 155

  if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
    taosArrayDestroy(pMerged->submits);
    taosArrayDestroy(pMerged->dataRefs);
    taosFreeQitem(pMerged);
    return NULL;
  }

J
jiacy-jcy 已提交
156
  pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
157 158 159
  return pMerged;
}

160
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
161
  taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
L
Liu Jicong 已提交
162
  taosArrayPush(pMerged->submits, &pSubmit->submit);
163 164 165 166
  pMerged->ver = pSubmit->ver;
  return 0;
}

167 168
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
  if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
L
Liu Jicong 已提交
169
    SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
170
    SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
L
Liu Jicong 已提交
171
    taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
172
    taosArrayDestroy(pBlockSrc->blocks);
173
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
174
    return dst;
175
  } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
176 177
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
    SStreamDataSubmit*   pBlockSrc = (SStreamDataSubmit*)pElem;
178
    streamMergeSubmit(pMerged, pBlockSrc);
179
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
180
    return dst;
181
  } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
182
    SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
183 184
    // todo handle error

185 186
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
187
    taosFreeQitem(dst);
188
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
189
    return (SStreamQueueItem*)pMerged;
L
Liu Jicong 已提交
190
  } else {
J
jiacy-jcy 已提交
191
    return NULL;
L
Liu Jicong 已提交
192 193 194 195 196
  }
}

void streamFreeQitem(SStreamQueueItem* data) {
  int8_t type = data->type;
L
Liu Jicong 已提交
197
  if (type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
198 199 200
    blockDataDestroy(((SStreamTrigger*)data)->pBlock);
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
201
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
202 203
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_SUBMIT) {
204
    streamDataSubmitDestroy((SStreamDataSubmit*)data);
L
Liu Jicong 已提交
205
    taosFreeQitem(data);
206
  } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
207
    SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
208 209

    int32_t sz = taosArrayGetSize(pMerge->submits);
210
    for (int32_t i = 0; i < sz; i++) {
211 212 213
      int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
      int32_t  ref = atomic_sub_fetch_32(pRef, 1);
      ASSERT(ref >= 0);
214

215
      if (ref == 0) {
L
Liu Jicong 已提交
216
        SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
L
Liu Jicong 已提交
217
        taosMemoryFree(pSubmit->msgStr);
218
        taosMemoryFree(pRef);
219 220
      }
    }
L
Liu Jicong 已提交
221
    taosArrayDestroy(pMerge->submits);
222 223
    taosArrayDestroy(pMerge->dataRefs);
    taosFreeQitem(pMerge);
L
Liu Jicong 已提交
224 225
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
    SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
H
Haojun Liao 已提交
226
    blockDataDestroy(pRefBlock->pBlock);
L
Liu Jicong 已提交
227
    taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
228 229
  }
}