diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 444f653810a3d9a386d40ae1892f626f4e4e1b0b..4a954f7ea35576f8b1c7c33883f4eafd6596b5aa 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -107,8 +107,9 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); void tsdbClearTableCfg(STableCfg *config); -int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val); -char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes); +int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val); +char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes); +STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f057dcb96e27209ca6a471487fbb845178a2cd35..8d70789b673209763028c2d820f2baaa6062dc31 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -438,6 +438,60 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { return pTable; } +STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { + if (pMsg == NULL) return NULL; + SSchema *pSchema = (SSchema *)pMsg->data; + int16_t numOfCols = htons(pMsg->numOfColumns); + int16_t numOfTags = htons(pMsg->numOfTags); + + STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg)); + if (pCfg == NULL) return NULL; + + if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err; + STSchema *pDSchema = tdNewSchema(numOfCols); + if (pDSchema == NULL) goto _err; + for (int i = 0; i < numOfCols; i++) { + tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + } + if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err; + if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err; + + if (numOfTags > 0) { + STSchema *pTSchema = tdNewSchema(numOfTags); + for (int i = numOfCols; i < numOfCols + numOfTags; i++) { + tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + } + if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err; + if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err; + if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err; + + char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); + int accBytes = 0; + SKVRowBuilder kvRowBuilder; + + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err; + for (int i = 0; i < numOfTags; i++) { + STColumn *pCol = schemaColAt(pTSchema, i); + tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes); + accBytes += htons(pSchema[i+numOfCols].bytes); + } + tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); + tdDestroyKVRowBuilder(&kvRowBuilder); + } + + if (pMsg->tableType == TSDB_STREAM_TABLE) { + char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); + tsdbTableSetStreamSql(pCfg, sql, true); + } + + return pCfg; + +_err: + tsdbClearTableCfg(pCfg); + tfree(pCfg); + return NULL; +} + // int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { int tsdbDropTable(TsdbRepoT *repo, STableId tableId) { STsdbRepo *pRepo = (STsdbRepo *)repo; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 1fa8abe379f06bfb822a9ad68ed34e898b573b53..5a3da2447f117501e9bf403d8995b3f6e2aec373 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -104,6 +104,16 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR } static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { + + STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont); + if (pCfg == NULL) return terrno; + int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg); + + tsdbClearTableCfg(pCfg); + free(pCfg); + return code; + + #if 0 SMDCreateTableMsg *pTable = pCont; int32_t code = 0; @@ -165,6 +175,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code); return code; + #endif } static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {