streamData.c 7.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18 19
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
  int32_t blockNum = pReq->blockNum;
20
  SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
L
Liu Jicong 已提交
21 22 23 24 25 26 27 28
  if (pArray == NULL) {
    return -1;
  }

  ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
  ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));

  for (int32_t i = 0; i < blockNum; i++) {
5
54liuyao 已提交
29
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
L
Liu Jicong 已提交
30
    SSDataBlock*       pDataBlock = taosArrayGet(pArray, i);
31
    blockDecode(pDataBlock, pRetrieve->data);
L
Liu Jicong 已提交
32
    // TODO: refactor
33 34
    pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
    pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
35
    pDataBlock->info.version = be64toh(pRetrieve->version);
36
    pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
37
    memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
38

L
Liu Jicong 已提交
39
    pDataBlock->info.type = pRetrieve->streamBlockType;
L
Liu Jicong 已提交
40 41 42 43 44 45 46 47 48 49
    pDataBlock->info.childId = pReq->upstreamChildId;
  }
  pData->blocks = pArray;
  return 0;
}

int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
  SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
L
Liu Jicong 已提交
50
  }
51

H
Haojun Liao 已提交
52
  taosArrayPush(pArray, &(SSDataBlock){0});
L
Liu Jicong 已提交
53
  SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
54
  SSDataBlock*       pDataBlock = taosArrayGet(pArray, 0);
55
  blockDecode(pDataBlock, pRetrieve->data);
56

57 58 59
  // TODO: refactor
  pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
  pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
60
  pDataBlock->info.version = be64toh(pRetrieve->version);
61 62 63

  pDataBlock->info.type = pRetrieve->streamBlockType;

L
Liu Jicong 已提交
64
  pData->reqId = pReq->reqId;
L
Liu Jicong 已提交
65
  pData->blocks = pArray;
L
Liu Jicong 已提交
66

L
Liu Jicong 已提交
67 68 69
  return 0;
}

70
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
H
Haojun Liao 已提交
71
  SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen);
72 73 74 75
  if (pDataSubmit == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
76
  pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
77 78 79 80 81
  if (pDataSubmit->dataRef == NULL) {
    taosFreeQitem(pDataSubmit);
    return NULL;
  }

L
Liu Jicong 已提交
82
  pDataSubmit->submit = submit;
83
  *pDataSubmit->dataRef = 1;   // initialize the reference count to be 1
84
  pDataSubmit->type = type;
85

L
Liu Jicong 已提交
86 87 88
  return pDataSubmit;
}

89 90 91 92 93 94 95 96 97 98
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) {
  int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
  ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);

  if (ref == 0) {
    taosMemoryFree(pDataSubmit->submit.msgStr);
    taosMemoryFree(pDataSubmit->dataRef);
  }
}

L
Liu Jicong 已提交
99
SStreamMergedSubmit2* streamMergedSubmitNew() {
100
  SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0);
101 102 103
  if (pMerged == NULL) {
    return NULL;
  }
104

L
Liu Jicong 已提交
105
  pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
106
  pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
107 108 109 110 111 112 113 114

  if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
    taosArrayDestroy(pMerged->submits);
    taosArrayDestroy(pMerged->dataRefs);
    taosFreeQitem(pMerged);
    return NULL;
  }

J
jiacy-jcy 已提交
115
  pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
116 117 118
  return pMerged;
}

L
Liu Jicong 已提交
119
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) {
120
  taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
L
Liu Jicong 已提交
121
  taosArrayPush(pMerged->submits, &pSubmit->submit);
122 123 124 125
  pMerged->ver = pSubmit->ver;
  return 0;
}

L
Liu Jicong 已提交
126
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) {
L
Liu Jicong 已提交
127 128 129
  atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}

130
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) {
131 132 133 134 135 136
  int32_t len = 0;
  if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
    len = pSubmit->submit.msgLen;
  }

  SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len);
L
Liu Jicong 已提交
137 138 139
  if (pSubmitClone == NULL) {
    return NULL;
  }
140

L
Liu Jicong 已提交
141
  streamDataSubmitRefInc(pSubmit);
L
Liu Jicong 已提交
142
  memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2));
L
Liu Jicong 已提交
143 144
  return pSubmitClone;
}
L
Liu Jicong 已提交
145

146 147
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
  if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
L
Liu Jicong 已提交
148
    SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
149
    SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
L
Liu Jicong 已提交
150
    taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
151
    taosArrayDestroy(pBlockSrc->blocks);
152
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
153
    return dst;
154
  } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
155
    SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst;
156
    SStreamDataSubmit2*   pBlockSrc = (SStreamDataSubmit2*)pElem;
157
    streamMergeSubmit(pMerged, pBlockSrc);
158
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
159
    return dst;
160
  } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
161
    SStreamMergedSubmit2* pMerged = streamMergedSubmitNew();
162 163
    // todo handle error

L
Liu Jicong 已提交
164
    streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst);
165
    streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem);
166
    taosFreeQitem(dst);
167
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
168
    return (SStreamQueueItem*)pMerged;
L
Liu Jicong 已提交
169
  } else {
J
jiacy-jcy 已提交
170
    return NULL;
L
Liu Jicong 已提交
171 172 173 174 175
  }
}

void streamFreeQitem(SStreamQueueItem* data) {
  int8_t type = data->type;
L
Liu Jicong 已提交
176
  if (type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
177 178 179
    blockDataDestroy(((SStreamTrigger*)data)->pBlock);
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
180
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
181 182
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_SUBMIT) {
183
    streamDataSubmitDestroy((SStreamDataSubmit2*)data);
L
Liu Jicong 已提交
184
    taosFreeQitem(data);
185
  } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
L
Liu Jicong 已提交
186 187
    SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data;
    int32_t               sz = taosArrayGetSize(pMerge->submits);
188
    for (int32_t i = 0; i < sz; i++) {
189 190 191 192
      int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
      int32_t  ref = atomic_sub_fetch_32(pRef, 1);
      ASSERT(ref >= 0);
      if (ref == 0) {
L
Liu Jicong 已提交
193
        SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
L
Liu Jicong 已提交
194
        taosMemoryFree(pSubmit->msgStr);
195
        taosMemoryFree(pRef);
196 197
      }
    }
L
Liu Jicong 已提交
198
    taosArrayDestroy(pMerge->submits);
199 200
    taosArrayDestroy(pMerge->dataRefs);
    taosFreeQitem(pMerge);
L
Liu Jicong 已提交
201 202 203 204 205 206 207 208 209 210
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
    SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;

    int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1);
    ASSERT(ref >= 0);
    if (ref == 0) {
      blockDataDestroy(pRefBlock->pBlock);
      taosMemoryFree(pRefBlock->dataRef);
    }
    taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
211 212
  }
}