未验证 提交 2811288c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2015 from maikebing/feature/mqtt

Implemented subscription json data from Mqtt broker! 
......@@ -46,7 +46,6 @@ html/
/CMakeCache.txt
/Makefile
/*.cmake
/deps
/src/cq/test/CMakeFiles/cqtest.dir/*.cmake
*.cmake
/src/cq/test/CMakeFiles/cqtest.dir/*.make
......@@ -65,3 +64,4 @@ CMakeError.log
/out/isenseconfig/WSL-Clang-Debug
/out/isenseconfig/WSL-GCC-Debug
/test/cfg
/src/.vs
......@@ -5,7 +5,7 @@
"generator": "Unix Makefiles",
"configurationType": "Debug",
"buildRoot": "${projectDir}\\build\\",
"installRoot": "${projectDir}\\out\\install\\${name}",
"installRoot": "${projectDir}\\build\\",
"cmakeExecutable": "/usr/bin/cmake",
"cmakeCommandArgs": "",
"buildCommandArgs": "",
......
......@@ -7,3 +7,4 @@ ADD_SUBDIRECTORY(regex)
ADD_SUBDIRECTORY(iconv)
ADD_SUBDIRECTORY(lz4)
ADD_SUBDIRECTORY(cJson)
ADD_SUBDIRECTORY(MQTT-C)
cmake_minimum_required(VERSION 3.5)
project(MQTT-C VERSION 1.1.2 LANGUAGES C)
# MQTT-C build options
option(MQTT_C_OpenSSL_SUPPORT "Build MQTT-C with OpenSSL support?" OFF)
option(MQTT_C_MbedTLS_SUPPORT "Build MQTT-C with mbed TLS support?" OFF)
option(MQTT_C_EXAMPLES "Build MQTT-C examples?" ON)
option(MQTT_C_TESTS "Build MQTT-C tests?" OFF)
list (APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake)
# MQTT-C library
add_library(mqttc STATIC
src/mqtt_pal.c
src/mqtt.c
)
target_include_directories(mqttc PUBLIC include)
target_link_libraries(mqttc PUBLIC
$<$<C_COMPILER_ID:MSVS>:ws2_32>
)
# Configure with OpenSSL support
if(MQTT_C_OpenSSL_SUPPORT)
find_package(OpenSSL REQUIRED)
target_link_libraries(mqttc INTERFACE OpenSSL::SSL)
target_compile_definitions(mqttc PUBLIC MQTT_USE_BIO)
endif()
# Configure with mbed TLS support
if(MQTT_C_MbedTLS_SUPPORT)
find_package(MbedTLS REQUIRED)
target_include_directories(mqttc PUBLIC ${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(mqttc INTERFACE ${MBEDTLS_LIBRARY})
target_compile_definitions(mqttc PUBLIC MQTT_USE_MBEDTLS)
endif()
# Build examples
if(MQTT_C_EXAMPLES)
find_package(Threads REQUIRED)
if(MQTT_C_OpenSSL_SUPPORT)
add_executable(bio_publisher examples/bio_publisher.c)
target_link_libraries(bio_publisher Threads::Threads mqttc)
add_executable(openssl_publisher examples/openssl_publisher.c)
target_link_libraries(openssl_publisher Threads::Threads mqttc)
elseif(MQTT_C_MbedTLS_SUPPORT)
add_executable(mbedtls_publisher examples/mbedtls_publisher.c)
target_link_libraries(mbedtls_publisher Threads::Threads mqttc ${MBEDX509_LIBRARY} ${MBEDCRYPTO_LIBRARY})
else()
add_executable(simple_publisher examples/simple_publisher.c)
target_link_libraries(simple_publisher Threads::Threads mqttc)
add_executable(simple_subscriber examples/simple_subscriber.c)
target_link_libraries(simple_subscriber Threads::Threads mqttc)
add_executable(reconnect_subscriber examples/reconnect_subscriber.c)
target_link_libraries(reconnect_subscriber Threads::Threads mqttc)
endif()
endif()
# Build tests
if(MQTT_C_TESTS)
find_path(CMOCKA_INCLUDE_DIR cmocka.h)
find_library(CMOCKA_LIBRARY cmocka)
if((NOT CMOCKA_INCLUDE_DIR) OR (NOT CMOCKA_LIBRARY))
message(FATAL_ERROR "Failed to find cmocka! Add cmocka's install prefix to CMAKE_PREFIX_PATH to resolve this error.")
endif()
add_executable(tests tests.c)
target_link_libraries(tests ${CMOCKA_LIBRARY} mqttc)
target_include_directories(tests PRIVATE ${CMOCKA_INCLUDE_DIR})
endif()
# Install includes and library
# install(TARGETS mqttc
# DESTINATION lib
# )
# install(DIRECTORY include/
# DESTINATION include)
此差异已折叠。
MIT License
Copyright (c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
<p align="right">
<a href="https://github.com/LiamBindle/MQTT-C/stargazers"><img src="https://img.shields.io/github/stars/LiamBindle/MQTT-C.svg?style=social&label=Star" style="margin-left:5em"></a>
<a href="https://github.com/LiamBindle/MQTT-C/network/members"><img src="https://img.shields.io/github/forks/LiamBindle/MQTT-C.svg?style=social&label=Fork"></a>
</p>
<p align="center">
<img width="70%" src="docs/mqtt-c-logo.png"><br>
<a href="https://liambindle.ca/MQTT-C"><img src="https://img.shields.io/badge/docs-passing-brightgreen.svg"></a>
<a href="https://github.com/LiamBindle/MQTT-C/issues"><img src="https://img.shields.io/badge/Maintained%3F-yes-green.svg"></a>
<a href="https://GitHub.com/LiamBindle/MQTT-C/issues/"><img src="https://img.shields.io/github/issues/LiamBindle/MQTT-C.svg"></a>
<a href="https://github.com/LiamBindle/MQTT-C/issues"><img src="https://img.shields.io/github/issues-closed/LiamBindle/MQTT-C.svg"></a>
<a href="https://github.com/LiamBindle/MQTT-C/blob/master/LICENSE"><img src="https://img.shields.io/badge/License-MIT-blue.svg"></a>
</p>
#
MQTT-C is an [MQTT v3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
client written in C. MQTT is a lightweight publisher-subscriber-based messaging protocol that is
commonly used in IoT and networking applications where high-latency and low data-rate links
are expected. The purpose of MQTT-C is to provide a **portable** MQTT client, **written in C**,
for embedded systems and PC's alike. MQTT-C does this by providing a transparent Platform
Abstraction Layer (PAL) which makes porting to new platforms easy. MQTT-C is completely
thread-safe but can also run perfectly fine on single-threaded systems making MQTT-C
well-suited for embedded systems and microcontrollers. Finally, MQTT-C is small; there are only
two source files totalling less than 2000 lines.
#### A note from the author
It's been great to hear about all the places MQTT-C is being used! Please don't hesitate
to get in touch with me or submit issues on GitHub!
## Getting Started
To use MQTT-C you first instantiate a `struct mqtt_client` and initialize it by calling
@ref mqtt_init.
```c
struct mqtt_client client; /* instantiate the client */
mqtt_init(&client, ...); /* initialize the client */
```
Once your client is initialized you need to connect to an MQTT broker.
```c
mqtt_connect(&client, ...); /* send a connection request to the broker. */
```
At this point the client is ready to use! For example, we can subscribe to a topic like so:
```c
/* subscribe to "toaster/temperature" with a max QoS level of 0 */
mqtt_subscribe(&client, "toaster/temperature", 0);
```
And we can publish to a topic like so:
```c
/* publish coffee temperature with a QoS level of 1 */
int temperature = 67;
mqtt_publish(&client, "coffee/temperature", &temperature, sizeof(int), MQTT_PUBLISH_QOS_1);
```
Those are the basics! From here the [examples](https://github.com/LiamBindle/MQTT-C/tree/master/examples) and [API documentation](https://liambindle.ca/MQTT-C/group__api.html) are good places to get started.
## Building
There are **only two source files** that need to be built, `mqtt.c` and `mqtt_pal.c`.
These files are ANSI C (C89) compatible, and should compile with any C compiler.
Then, simply <code>\#include <mqtt.h></code>.
Alternatively, you can build MQTT-C with CMake or the provided Makefile. These are provided for convenience.
## Documentation
Pre-built documentation can be found here: [https://liambindle.ca/MQTT-C](https://liambindle.ca/MQTT-C). Be sure to check out the [examples](https://github.com/LiamBindle/MQTT-C/tree/master/examples) too.
The @ref api documentation contains all the documentation application programmers should need.
The @ref pal documentation contains everything you should need to port MQTT-C to a new platform,
and the other modules contain documentation for MQTT-C developers.
## Testing and Building the Tests
The MQTT-C unit tests use the [cmocka unit testing framework](https://cmocka.org/).
Therefore, [cmocka](https://cmocka.org/) *must* be installed on your machine to build and run
the unit tests. For convenience, a simple `"makefile"` is included to build the unit tests and
examples on UNIX-like machines. The unit tests and examples can be built as follows:
```bash
$ make all
```
The unit tests and examples will be built in the `"bin/"` directory. The unit tests can be run
like so:
```bash
$ ./bin/tests [address [port]]
```
Note that the \c address and \c port arguments are both optional to specify the location of the
MQTT broker that is to be used for the tests. If no \c address is given then the
[Mosquitto MQTT Test Server](https://test.mosquitto.org/) will be used. If no \c port is given,
port 1883 will be used.
## Portability
MQTT-C provides a transparent platform abstraction layer (PAL) in `mqtt_pal.h` and `mqtt_pal.c`.
These files declare and implement the types and calls that MQTT-C requires. Refer to
@ref pal for the complete documentation of the PAL.
## Contributing
Please feel free to submit issues and pull-requests [here](https://github.com/LiamBindle/MQTT-C).
When submitting a pull-request please ensure you have *fully documented* your changes and
added the appropriate unit tests.
## License
This project is licensed under the [MIT License](https://opensource.org/licenses/MIT). See the
`"LICENSE"` file for more details.
## Authors
MQTT-C was initially developed as a CMPT 434 (Winter Term, 2018) final project at the University of
Saskatchewan by:
- **Liam Bindle**
- **Demilade Adeoye**
/**
* @file
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/bio_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* Load OpenSSL */
SSL_load_error_strings();
ERR_load_BIO_strings();
OpenSSL_add_all_algorithms();
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
BIO* sockfd = open_nb_socket(addr, port);
if (sockfd == NULL) {
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
{
if (sockfd != NULL) BIO_free_all(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/mbedtls_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
const char* ca_file;
struct mbedtls_context ctx;
mqtt_pal_socket_handle sockfd;
if (argc > 1) {
ca_file = argv[1];
} else {
printf("error: path to the CA certificate to use\n");
exit(1);
}
/* get address (argv[2] if present) */
if (argc > 2) {
addr = argv[2];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[3] if present) */
if (argc > 3) {
port = argv[3];
} else {
port = "8883";
}
/* get the topic name to publish */
if (argc > 4) {
topic = argv[4];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
open_nb_socket(&ctx, addr, port, ca_file);
sockfd = &ctx.ssl_ctx;
if (sockfd == NULL) {
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_daemon)
{
if (client_daemon != NULL) pthread_cancel(*client_daemon);
mbedtls_ssl_free(sockfd);
/* XXX free the rest of contexts */
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
/**
* @file
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/openssl_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
const char* ca_file;
/* Load OpenSSL */
SSL_load_error_strings();
ERR_load_BIO_strings();
OpenSSL_add_all_algorithms();
SSL_library_init();
SSL_CTX* ssl_ctx;
BIO* sockfd;
if (argc > 1) {
ca_file = argv[1];
} else {
printf("error: path to the CA certificate to use\n");
exit(1);
}
/* get address (argv[2] if present) */
if (argc > 2) {
addr = argv[2];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[3] if present) */
if (argc > 3) {
port = argv[3];
} else {
port = "8883";
}
/* get the topic name to publish */
if (argc > 4) {
topic = argv[4];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
open_nb_socket(&sockfd, &ssl_ctx, addr, port, ca_file, NULL);
if (sockfd == NULL) {
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
{
if (sockfd != NULL) BIO_free_all(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
* A simple subscriber program that performs automatic reconnections.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/posix_sockets.h"
/**
* @brief A structure that I will use to keep track of some data needed
* to setup the connection to the broker.
*
* An instance of this struct will be created in my \c main(). Then, whenever
* \ref reconnect_client is called, this instance will be passed.
*/
struct reconnect_state_t {
const char* hostname;
const char* port;
const char* topic;
uint8_t* sendbuf;
size_t sendbufsz;
uint8_t* recvbuf;
size_t recvbufsz;
};
/**
* @brief My reconnect callback. It will reestablish the connection whenever
* an error occurs.
*/
void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr);
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, int sockfd, pthread_t *client_daemon);
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* build the reconnect_state structure which will be passed to reconnect */
struct reconnect_state_t reconnect_state;
reconnect_state.hostname = addr;
reconnect_state.port = port;
reconnect_state.topic = topic;
uint8_t sendbuf[2048];
uint8_t recvbuf[1024];
reconnect_state.sendbuf = sendbuf;
reconnect_state.sendbufsz = sizeof(sendbuf);
reconnect_state.recvbuf = recvbuf;
reconnect_state.recvbufsz = sizeof(recvbuf);
/* setup a client */
struct mqtt_client client;
mqtt_init_reconnect(&client,
reconnect_client, &reconnect_state,
publish_callback
);
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, -1, NULL);
}
/* start publishing the time */
printf("%s listening for '%s' messages.\n", argv[0], topic);
printf("Press ENTER to inject an error.\n");
printf("Press CTRL-D to exit.\n\n");
/* block */
while(fgetc(stdin) != EOF) {
printf("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\"\n");
client.error = MQTT_ERROR_SOCKET_ERROR;
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, client.socketfd, &client_daemon);
}
void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr)
{
struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr);
/* Close the clients socket if this isn't the initial reconnect call */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd);
}
/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
printf("reconnect_client: called while client was in error state \"%s\"\n",
mqtt_error_str(client->error)
);
}
/* Open a new socket. */
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* Reinitialize the client. */
mqtt_reinit(client, sockfd,
reconnect_state->sendbuf, reconnect_state->sendbufsz,
reconnect_state->recvbuf, reconnect_state->recvbufsz
);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
/* Subscribe to the topic. */
mqtt_subscribe(client, reconnect_state->topic, 0);
}
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*) malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';
printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
free(topic_name);
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/posix_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, int sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
int sockfd = open_nb_socket(addr, port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(&client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_0);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
* A simple program that subscribes to a topic.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/posix_sockets.h"
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, int sockfd, pthread_t *client_daemon);
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
int sockfd = open_nb_socket(addr, port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(&client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* subscribe */
mqtt_subscribe(&client, topic, 0);
/* start publishing the time */
printf("%s listening for '%s' messages.\n", argv[0], topic);
printf("Press CTRL-D to exit.\n\n");
/* block */
while(fgetc(stdin) != EOF);
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*) malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';
printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
free(topic_name);
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
#ifndef __BIO_SOCKET_TEMPLATE_H__
#define __BIO_SOCKET_TEMPLATE_H__
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
/*
A template for opening a non-blocking BIO socket.
*/
BIO* open_nb_socket(const char* addr, const char* port) {
BIO* bio = BIO_new_connect(addr);
BIO_set_nbio(bio, 1);
BIO_set_conn_port(bio, port);
/* timeout after 10 seconds */
int start_time = time(NULL);
while(BIO_do_connect(bio) == 0 && (int)time(NULL) - start_time < 10);
if (BIO_do_connect(bio) <= 0) {
fprintf(stderr, "Failed to open socket: BIO_do_connect returned <= 0\n");
return NULL;
}
return bio;
}
#endif
\ No newline at end of file
#ifndef __MBEDTLS_SOCKET_TEMPLATE_H__
#define __MBEDTLS_SOCKET_TEMPLATE_H__
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mbedtls/error.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>
#include <mbedtls/net_sockets.h>
#include <mbedtls/ssl.h>
#if !defined(MBEDTLS_NET_POLL_READ)
/* compat for older mbedtls */
#define MBEDTLS_NET_POLL_READ 1
#define MBEDTLS_NET_POLL_WRITE 1
int
mbedtls_net_poll(mbedtls_net_context * ctx, uint32_t rw, uint32_t timeout)
{
/* XXX this is not ideal but good enough for an example */
usleep(300);
return 1;
}
#endif
struct mbedtls_context {
mbedtls_net_context net_ctx;
mbedtls_ssl_context ssl_ctx;
mbedtls_ssl_config ssl_conf;
mbedtls_x509_crt ca_crt;
mbedtls_entropy_context entropy;
mbedtls_ctr_drbg_context ctr_drbg;
};
void failed(const char *fn, int rv) {
char buf[100];
mbedtls_strerror(rv, buf, sizeof(buf));
printf("%s failed with %x (%s)\n", fn, -rv, buf);
exit(1);
}
void cert_verify_failed(uint32_t rv) {
char buf[512];
mbedtls_x509_crt_verify_info(buf, sizeof(buf), "\t", rv);
printf("Certificate verification failed (%0" PRIx32 ")\n%s\n", rv, buf);
exit(1);
}
/*
A template for opening a non-blocking mbed TLS connection.
*/
void open_nb_socket(struct mbedtls_context *ctx,
const char *hostname,
const char *port,
const char *ca_file) {
const unsigned char *additional = (const unsigned char *)"MQTT-C";
size_t additional_len = 6;
int rv;
mbedtls_net_context *net_ctx = &ctx->net_ctx;
mbedtls_ssl_context *ssl_ctx = &ctx->ssl_ctx;
mbedtls_ssl_config *ssl_conf = &ctx->ssl_conf;
mbedtls_x509_crt *ca_crt = &ctx->ca_crt;
mbedtls_entropy_context *entropy = &ctx->entropy;
mbedtls_ctr_drbg_context *ctr_drbg = &ctx->ctr_drbg;
mbedtls_entropy_init(entropy);
mbedtls_ctr_drbg_init(ctr_drbg);
rv = mbedtls_ctr_drbg_seed(ctr_drbg, mbedtls_entropy_func, entropy,
additional, additional_len);
if (rv != 0) {
failed("mbedtls_ctr_drbg_seed", rv);
}
mbedtls_x509_crt_init(ca_crt);
rv = mbedtls_x509_crt_parse_file(ca_crt, ca_file);
if (rv != 0) {
failed("mbedtls_x509_crt_parse_file", rv);
}
mbedtls_ssl_config_init(ssl_conf);
rv = mbedtls_ssl_config_defaults(ssl_conf, MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT);
if (rv != 0) {
failed("mbedtls_ssl_config_defaults", rv);
}
mbedtls_ssl_conf_ca_chain(ssl_conf, ca_crt, NULL);
mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
mbedtls_ssl_conf_rng(ssl_conf, mbedtls_ctr_drbg_random, ctr_drbg);
mbedtls_net_init(net_ctx);
rv = mbedtls_net_connect(net_ctx, hostname, port, MBEDTLS_NET_PROTO_TCP);
if (rv != 0) {
failed("mbedtls_net_connect", rv);
}
rv = mbedtls_net_set_nonblock(net_ctx);
if (rv != 0) {
failed("mbedtls_net_set_nonblock", rv);
}
mbedtls_ssl_init(ssl_ctx);
rv = mbedtls_ssl_setup(ssl_ctx, ssl_conf);
if (rv != 0) {
failed("mbedtls_ssl_setup", rv);
}
rv = mbedtls_ssl_set_hostname(ssl_ctx, hostname);
if (rv != 0) {
failed("mbedtls_ssl_set_hostname", rv);
}
mbedtls_ssl_set_bio(ssl_ctx, net_ctx,
mbedtls_net_send, mbedtls_net_recv, NULL);
for (;;) {
rv = mbedtls_ssl_handshake(ssl_ctx);
uint32_t want = 0;
if (rv == MBEDTLS_ERR_SSL_WANT_READ) {
want |= MBEDTLS_NET_POLL_READ;
} else if (rv == MBEDTLS_ERR_SSL_WANT_WRITE) {
want |= MBEDTLS_NET_POLL_WRITE;
} else {
break;
}
rv = mbedtls_net_poll(net_ctx, want, -1);
if (rv < 0) {
failed("mbedtls_net_poll", rv);
}
}
if (rv != 0) {
failed("mbedtls_ssl_handshake", rv);
}
uint32_t result = mbedtls_ssl_get_verify_result(ssl_ctx);
if (result != 0) {
if (result == (uint32_t)-1) {
failed("mbedtls_ssl_get_verify_result", result);
} else {
cert_verify_failed(result);
}
}
}
#endif
#ifndef __OPENSSL_SOCKET_TEMPLATE_H__
#define __OPENSSL_SOCKET_TEMPLATE_H__
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
/*
A template for opening a non-blocking OpenSSL connection.
*/
void open_nb_socket(BIO** bio, SSL_CTX** ssl_ctx, const char* addr, const char* port, const char* ca_file, const char* ca_path) {
*ssl_ctx = SSL_CTX_new(SSLv23_client_method());
SSL* ssl;
/* load certificate */
if (!SSL_CTX_load_verify_locations(*ssl_ctx, ca_file, ca_path)) {
printf("error: failed to load certificate\n");
exit(1);
}
/* open BIO socket */
*bio = BIO_new_ssl_connect(*ssl_ctx);
BIO_get_ssl(*bio, &ssl);
SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
BIO_set_conn_hostname(*bio, addr);
BIO_set_nbio(*bio, 1);
BIO_set_conn_port(*bio, port);
/* wait for connect with 10 second timeout */
int start_time = time(NULL);
int do_connect_rv = BIO_do_connect(*bio);
while(do_connect_rv <= 0 && BIO_should_retry(*bio) && (int)time(NULL) - start_time < 10) {
do_connect_rv = BIO_do_connect(*bio);
}
if (do_connect_rv <= 0) {
printf("error: %s\n", ERR_reason_error_string(ERR_get_error()));
BIO_free_all(*bio);
SSL_CTX_free(*ssl_ctx);
*bio = NULL;
*ssl_ctx=NULL;
return;
}
/* verify certificate */
if (SSL_get_verify_result(ssl) != X509_V_OK) {
/* Handle the failed verification */
printf("error: x509 certificate verification failed\n");
exit(1);
}
}
#endif
\ No newline at end of file
#ifndef __POSIX_SOCKET_TEMPLATE_H__
#define __POSIX_SOCKET_TEMPLATE_H__
#include <stdio.h>
#include <sys/types.h>
#if !defined(WIN32)
#include <sys/socket.h>
#include <netdb.h>
#endif
#include <fcntl.h>
/*
A template for opening a non-blocking POSIX socket.
*/
int open_nb_socket(const char* addr, const char* port) {
struct addrinfo hints = {0};
hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Must be TCP */
int sockfd = -1;
int rv;
struct addrinfo *p, *servinfo;
/* get address information */
rv = getaddrinfo(addr, port, &hints, &servinfo);
if(rv != 0) {
fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerror(rv));
return -1;
}
/* open the first possible socket */
for(p = servinfo; p != NULL; p = p->ai_next) {
sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (sockfd == -1) continue;
/* connect to server */
rv = connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen);
if(rv == -1) continue;
break;
}
/* free servinfo */
freeaddrinfo(servinfo);
/* make non-blocking */
#if !defined(WIN32)
if (sockfd != -1) fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
#else
if (sockfd != INVALID_SOCKET) {
int iMode = 1;
ioctlsocket(sockfd, FIONBIO, &iMode);
}
#endif
/* return the new socket fd */
return sockfd;
}
#endif
\ No newline at end of file
此差异已折叠。
#ifndef __MQTT_PAL_H__
#define __MQTT_PAL_H__
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/**
* @file
* @brief Includes/supports the types/calls required by the MQTT-C client.
*
* @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore
* responsible for including/supporting all the required types and calls.
*
* @defgroup pal Platform abstraction layer
* @brief Documentation of the types and calls required to port MQTT-C to a new platform.
*
* mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a
* new platform the following types, functions, constants, and macros must be defined in
* mqtt_pal.h:
* - Types:
* - \c size_t, \c ssize_t
* - \c uint8_t, \c uint16_t, \c uint32_t
* - \c va_list
* - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME()
* - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and
* \c MQTT_PAL_MUTEX_RELEASE
* - Functions:
* - \c memcpy, \c strlen
* - \c va_start, \c va_arg, \c va_end
* - Constants:
* - \c INT_MIN
*
* Additionally, three macro's are required:
* - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t.
* - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t.
* - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds.
* - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer.
* - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by
* \c mtx_pointer.
*
* Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c
* for sending and receiving data using the platforms socket calls.
*/
/* UNIX-like platform support */
#if defined(__unix__) || defined(__APPLE__)
#include <limits.h>
#include <string.h>
#include <stdarg.h>
#include <time.h>
#include <arpa/inet.h>
#include <pthread.h>
#define MQTT_PAL_HTONS(s) htons(s)
#define MQTT_PAL_NTOHS(s) ntohs(s)
#define MQTT_PAL_TIME() time(NULL)
typedef time_t mqtt_pal_time_t;
typedef pthread_mutex_t mqtt_pal_mutex_t;
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL)
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr)
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr)
#ifndef MQTT_USE_CUSTOM_SOCKET_HANDLE
#ifdef MQTT_USE_MBEDTLS
struct mbedtls_ssl_context;
typedef struct mbedtls_ssl_context *mqtt_pal_socket_handle;
#elif defined(MQTT_USE_BIO)
#include <openssl/bio.h>
typedef BIO* mqtt_pal_socket_handle;
#else
typedef int mqtt_pal_socket_handle;
#endif
#endif
#elif defined(_MSC_VER)
#include <limits.h>
#include <windows.h>
#include <time.h>
#include <stdint.h>
#include <winsock2.h>
typedef SSIZE_T ssize_t;
#define MQTT_PAL_HTONS(s) htons(s)
#define MQTT_PAL_NTOHS(s) ntohs(s)
#define MQTT_PAL_TIME() time(NULL)
typedef time_t mqtt_pal_time_t;
typedef CRITICAL_SECTION mqtt_pal_mutex_t;
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) InitializeCriticalSection(mtx_ptr)
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) EnterCriticalSection(mtx_ptr)
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) LeaveCriticalSection(mtx_ptr)
#ifndef MQTT_USE_CUSTOM_SOCKET_HANDLE
#ifdef MQTT_USE_BIO
#include <openssl/bio.h>
typedef BIO* mqtt_pal_socket_handle;
#else
typedef SOCKET mqtt_pal_socket_handle;
#endif
#endif
#endif
/**
* @brief Sends all the bytes in a buffer.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the first byte in the buffer to send.
* @param[in] len The number of bytes to send (starting at \p buf).
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags);
/**
* @brief Non-blocking receive all the byte available.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the receive buffer.
* @param[in] bufsz The max number of bytes that can be put into \p buf.
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes received if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags);
#endif
此差异已折叠。
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include <mqtt.h>
/**
* @file
* @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and
* any platform-specific helpers you'd like.
* @cond Doxygen_Suppress
*/
#ifdef MQTT_USE_MBEDTLS
#include <mbedtls/ssl.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
int rv = mbedtls_ssl_write(fd, buf + sent, len - sent);
if (rv < 0) {
if (rv == MBEDTLS_ERR_SSL_WANT_READ ||
rv == MBEDTLS_ERR_SSL_WANT_WRITE
#if defined(MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS
#endif
#if defined(MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS
#endif
) {
/* should call mbedtls_ssl_writer later again */
break;
}
return MQTT_ERROR_SOCKET_ERROR;
}
sent += (size_t) rv;
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void *const start = buf;
int rv;
do {
rv = mbedtls_ssl_read(fd, buf, bufsz);
if (rv < 0) {
if (rv == MBEDTLS_ERR_SSL_WANT_READ ||
rv == MBEDTLS_ERR_SSL_WANT_WRITE
#if defined(MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS
#endif
#if defined(MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS
#endif
) {
/* should call mbedtls_ssl_read later again */
break;
}
return MQTT_ERROR_SOCKET_ERROR;
}
buf = (char*)buf + rv;
bufsz -= rv;
} while (rv > 0);
return buf - start;
}
#elif defined(MQTT_USE_BIO)
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
int tmp = BIO_write(fd, buf + sent, len - sent);
if (tmp > 0) {
sent += (size_t) tmp;
} else if (tmp <= 0 && !BIO_should_retry(fd)) {
return MQTT_ERROR_SOCKET_ERROR;
}
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void *const start = buf;
int rv;
do {
rv = BIO_read(fd, buf, bufsz);
if (rv > 0) {
/* successfully read bytes from the socket */
buf += rv;
bufsz -= rv;
} else if (!BIO_should_retry(fd)) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
} while (!BIO_should_read(fd));
return (ssize_t)(buf - start);
}
#elif defined(__unix__) || defined(__APPLE__)
#include <errno.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
ssize_t tmp = send(fd, buf + sent, len - sent, flags);
if (tmp < 1) {
return MQTT_ERROR_SOCKET_ERROR;
}
sent += (size_t) tmp;
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void *const start = buf;
ssize_t rv;
do {
rv = recv(fd, buf, bufsz, flags);
if (rv > 0) {
/* successfully read bytes from the socket */
buf += rv;
bufsz -= rv;
} else if (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
} while (rv > 0);
return buf - start;
}
#elif defined(_MSC_VER)
#include <errno.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
ssize_t tmp = send(fd, (char*)buf + sent, len - sent, flags);
if (tmp < 1) {
return MQTT_ERROR_SOCKET_ERROR;
}
sent += (size_t) tmp;
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const char *const start = buf;
ssize_t rv;
do {
rv = recv(fd, buf, bufsz, flags);
if (rv > 0) {
/* successfully read bytes from the socket */
buf = (char*)buf + rv;
bufsz -= rv;
} else if (rv < 0) {
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
}
} while (rv > 0);
return (ssize_t)((char*)buf - start);
}
#else
#error No PAL!
#endif
/** @endcond */
此差异已折叠。
......@@ -1209,11 +1209,49 @@ TDengine在Window系统上提供的API与Linux系统是相同的, 应用程序
其中,最常用的文件列出如下:
+ Client可执行文件: /usr/local/taos/bin/taos 软连接到 /usr/local/bin/taos
+ 配置文件: /usr/local/taos/cfg/taos.cfg 软连接到 /etc/taos/taos.cfg
+ 驱动程序目录: /usr/local/taos/driver/libtaos.1.6.5.1.dylib 软连接到 /usr/local/lib/libtaos.dylib
+ 驱动程序头文件: /usr/local/taos/include/taos.h 软连接到 /usr/local/include/taos.h
+ 日志目录(第一次运行程序时生成):~/TDengineLog
## MQTT客户端
MQTT客户端实现了订阅MQTT Broker的特定Topic将Json数据进行转换入库的功能,任何终端只要将数据发给特定的Topic 即可,不用再编写转换器或者数据解析程序。如果终端量大,需要 Mqtt Broker 群集,这里不再详述。
#### 如何配置?
首先需要在 taos.cfg 中打开配置项 mqtt 用来启用, 再通过修改 mqttBrokerAddress 的值来配置连接,格式为:
> mqtt://username:password@hostname:port/path/
例如:
> mqtt://127.0.0.1:1883/taos/ mqtt://root@kissme@127.0.0.1:1883/taos/
#### Topic 格式说明
Mqtt 的topic格式为
> /<path>/<token>/<db name>/<table name>/
因此TDengine的Mqtt客户端会订阅:
> /taos/+/+/+/+/
例如:
> /taos/token/db/t/
注意: 测试时如果需要使用到Mqtt Broker 推荐使用 [mosquitto](http://mosquitto.org/) ,客户端可以使用 [MQTT.fx ](http://www.jensd.de/)
[1]: https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver
......
......@@ -166,6 +166,15 @@
# start system monitor module
# monitor 1
# start http service
# mqtt 0
# mqtt uri
# mqttBrokerAddress mqtt://username:password@hostname:1883/taos/
# mqtt client name
# mqttBrokerClientId taos_mqtt
# maximum number of rows returned by the restful interface
# restfulRowLimit 10240
......@@ -244,5 +253,8 @@
# debug flag for system monitor
# monitorDebugFlag 131
#debug flag for mqtt client
# mqttDebugFlag 131
# debug flag for TAOS TIMER
# tmrDebugFlag 131
......@@ -94,6 +94,10 @@ extern int32_t tsMaxTables;
extern char tsDefaultDB[];
extern char tsDefaultUser[];
extern char tsDefaultPass[];
extern char tsMqttBrokerAddress[];
extern char tsMqttBrokerClientId[];
extern int32_t tsMaxConnections;
extern int32_t tsBalanceInterval;
......
......@@ -199,6 +199,8 @@ int32_t tsMonitorInterval = 30; // seconds
char tsTimezone[64] = {0};
char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
char tsMqttBrokerAddress[128] = {0};
char tsMqttBrokerClientId[128] = {0};
int32_t tsMaxBinaryDisplayWidth = 30;
......@@ -729,6 +731,26 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttBrokerAddress";
cfg.ptr = tsMqttBrokerAddress;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 126;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttBrokerClientId";
cfg.ptr = tsMqttBrokerClientId;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 126;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// socket type; udp by default
cfg.option = "sockettype";
cfg.ptr = tsSocketType;
......
......@@ -19,7 +19,7 @@
#include "tglobal.h"
#include "mnode.h"
#include "http.h"
#include "mqtt.h"
#include "tmqtt.h"
#include "monitor.h"
#include "dnodeInt.h"
#include "dnodeModule.h"
......
......@@ -19,10 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttGetReqCount();
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
......
......@@ -7,16 +7,16 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/include)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/examples/templates)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(mqtt ${SRC})
TARGET_LINK_LIBRARIES(mqtt taos_static z)
TARGET_LINK_LIBRARIES(mqtt taos_static cJson mqttc)
IF (TD_ADMIN)
TARGET_LINK_LIBRARIES(mqtt admin)
TARGET_LINK_LIBRARIES(mqtt admin cJson)
ENDIF ()
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_INIT_H
#define TDENGINE_MQTT_INIT_H
#ifdef __cplusplus
extern "C" {
#endif
/**
* @file
* A simple subscriber program that performs automatic reconnections.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mqtt.h"
#include "taos.h"
/**
* @brief A structure that I will use to keep track of some data needed
* to setup the connection to the broker.
*
* An instance of this struct will be created in my \c main(). Then, whenever
* \ref mqttReconnectClient is called, this instance will be passed.
*/
struct reconnect_state_t {
char* hostname;
char* port;
char* topic;
char* client_id;
char* user_name;
char* password;
uint8_t* sendbuf;
size_t sendbufsz;
uint8_t* recvbuf;
size_t recvbufsz;
};
/**
* @brief My reconnect callback. It will reestablish the connection whenever
* an error occurs.
*/
void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr);
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* mqttClientRefresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon);
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code);
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code);
#define QOS 1
#define TIMEOUT 10000L
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_PLYLOAD_H
#define TDENGINE_MQTT_PLYLOAD_H
#ifdef __cplusplus
extern "C" {
#endif
char split(char str[], char delims[], char** p_p_cmd_part, int max);
char* converJsonToSql(char* json, char* _dbname, char* _tablename);
#ifdef __cplusplus
}
#endif
#endif
......@@ -18,15 +18,11 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttGetReqCount();
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mqttPayload.h"
#include "cJSON.h"
#include "string.h"
#include "taos.h"
#include "mqttLog.h"
#include "os.h"
char split(char str[], char delims[], char** p_p_cmd_part, int max) {
char* token = strtok(str, delims);
char part_index = 0;
char** tmp_part = p_p_cmd_part;
while (token) {
*tmp_part++ = token;
token = strtok(NULL, delims);
part_index++;
if (part_index >= max) break;
}
return part_index;
}
char* converJsonToSql(char* json, char* _dbname, char* _tablename) {
cJSON* jPlayload = cJSON_Parse(json);
char _names[102400] = {0};
char _values[102400] = {0};
int i = 0;
int count = cJSON_GetArraySize(jPlayload);
for (; i < count; i++)
{
cJSON* item = cJSON_GetArrayItem(jPlayload, i);
if (cJSON_Object == item->type) {
mqttPrint("The item '%s' is not supported", item->string);
} else {
strcat(_names, item->string);
if (i < count - 1) {
strcat(_names, ",");
}
char* __value_json = cJSON_Print(item);
strcat(_values, __value_json);
free(__value_json);
if (i < count - 1) {
strcat(_values, ",");
}
}
}
cJSON_free(jPlayload);
int sqllen = strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024;
char* _sql = calloc(1, sqllen);
sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values);
return _sql;
}
\ No newline at end of file
......@@ -14,30 +14,214 @@
*/
#define _DEFAULT_SOURCE
#include "mqttSystem.h"
#include "cJSON.h"
#include "mqtt.h"
#include "mqttInit.h"
#include "mqttLog.h"
#include "mqttPayload.h"
#include "os.h"
#include "posix_sockets.h"
#include "string.h"
#include "taos.h"
#include "tglobal.h"
#include "tmqtt.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"
#include "mqttSystem.h"
struct mqtt_client mqttClient = {0};
pthread_t clientDaemonThread = {0};
void* mqttConnect=NULL;
struct reconnect_state_t recntStatus = {0};
char* topicPath=NULL;
int mttIsRuning = 1;
int32_t mqttInitSystem() {
int rc = 0;
uint8_t sendbuf[2048];
uint8_t recvbuf[1024];
recntStatus.sendbuf = sendbuf;
recntStatus.sendbufsz = sizeof(sendbuf);
recntStatus.recvbuf = recvbuf;
recntStatus.recvbufsz = sizeof(recvbuf);
char* url = tsMqttBrokerAddress;
recntStatus.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL;
recntStatus.password = strstr(url, "@") != NULL ? strbetween(strstr(url, recntStatus.user_name), ":", "@") : NULL;
if (strstr(url, "@") != NULL) {
recntStatus.hostname = strbetween(url, "@", ":");
} else if (strstr(strstr(url, "://") + 3, ":") != NULL) {
recntStatus.hostname = strbetween(url, "//", ":");
} else {
recntStatus.hostname = strbetween(url, "//", "/");
}
char* _begin_hostname = strstr(url, recntStatus.hostname);
if (strstr(_begin_hostname, ":") != NULL) {
recntStatus.port = strbetween(_begin_hostname, ":", "/");
} else {
recntStatus.port = strbetween("'1883'", "'", "'");
}
int32_t mqttGetReqCount() { return 0; }
int mqttInitSystem() {
mqttPrint("mqttInitSystem");
return 0;
topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recntStatus.port : recntStatus.hostname),
"/", "/");
char* _topic = "+/+/+/";
int _tpsize = strlen(topicPath) + strlen(_topic) + 1;
recntStatus.topic = calloc(1, _tpsize);
sprintf(recntStatus.topic, "/%s/%s", topicPath, _topic);
recntStatus.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt";
mqttConnect = NULL;
return rc;
}
int mqttStartSystem() {
mqttPrint("mqttStartSystem");
return 0;
int32_t mqttStartSystem() {
int rc = 0;
if (recntStatus.user_name != NULL && recntStatus.password != NULL) {
mqttPrint("connecting to mqtt://%s:%s@%s:%s/%s/", recntStatus.user_name, recntStatus.password,
recntStatus.hostname, recntStatus.port, topicPath);
}
else if (recntStatus.user_name != NULL && recntStatus.password == NULL)
{
mqttPrint("connecting to mqtt://%s@%s:%s/%s/", recntStatus.user_name,recntStatus.hostname, recntStatus.port, topicPath);
}
mqtt_init_reconnect(&mqttClient, mqttReconnectClient, &recntStatus, mqtt_PublishCallback);
if (pthread_create(&clientDaemonThread, NULL, mqttClientRefresher, &mqttClient)) {
mqttError("Failed to start client daemon.");
mqttCleanup(EXIT_FAILURE, -1, NULL);
rc = -1;
} else {
mqttPrint("listening for '%s' messages.", recntStatus.topic);
}
return rc;
}
void mqttStopSystem() {
mqttPrint("mqttStopSystem");
mqttClient.error = MQTT_ERROR_SOCKET_ERROR;
mttIsRuning = 0;
usleep(300000U);
mqttCleanup(EXIT_SUCCESS, mqttClient.socketfd, &clientDaemonThread);
mqttPrint("mqtt is stoped");
}
void mqttCleanUpSystem() {
mqttPrint("starting to clean up mqtt");
free(recntStatus.user_name);
free(recntStatus.password);
free(recntStatus.hostname);
free(recntStatus.port);
free(recntStatus.topic);
free(topicPath);
mqttPrint("mqtt is cleaned up");
}
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) {
mqttPrint("mqtt_PublishCallback");
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*)malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';
mqttPrint("Received publish('%s'): %s", topic_name, (const char*)published->application_message);
char _token[128] = {0};
char _dbname[128] = {0};
char _tablename[128] = {0};
if (mqttConnect == NULL) {
mqttPrint("connect database");
taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &mqttClient, &mqttConnect);
}
if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) {
char* p_p_cmd_part[5] = {0};
char copystr[1024] = {0};
strncpy(copystr, topic_name, MIN(1024, published->topic_name_size));
char part_index = split(copystr, "/", p_p_cmd_part, 10);
if (part_index < 4) {
mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", topic_name);
} else {
strncpy(_token, p_p_cmd_part[1], 127);
strncpy(_dbname, p_p_cmd_part[2], 127);
strncpy(_tablename, p_p_cmd_part[3], 127);
mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
_tablename);
if (mqttConnect != NULL) {
char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename);
mqttPrint("query:%s", _sql);
taos_query_a(mqttConnect, _sql, mqttQueryInsertCallback, &mqttClient);
mqttPrint("free sql:%s", _sql);
free(_sql);
}
}
}
free(topic_name);
}
void mqttCleanUpSystem() {
mqttPrint("mqttCleanUpSystem");
void* mqttClientRefresher(void* client) {
while (mttIsRuning) {
mqtt_sync((struct mqtt_client*)client);
usleep(100000U);
}
mqttPrint("Exit mqttClientRefresher");
return NULL;
}
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) {
mqttPrint("mqttCleanup");
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
}
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) {
mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code));
taos_close(mqttConnect);
mqttConnect = NULL;
return;
}
mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));
}
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) {
mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code));
} else if (code == 0) {
mqttError("mqtt:%d, save data failed, affect rows:%d", code, code);
} else {
mqttPrint("mqtt:%d, save data success, code:%s", code, tstrerror(code));
}
}
void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) {
mqttPrint("mqttReconnectClient");
struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr);
/* Close the clients socket if this isn't the initial reconnect call */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd);
}
/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
mqttError("mqttReconnectClient: called while client was in error state \"%s\"", mqtt_error_str(client->error));
}
/* Open a new socket. */
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
if (sockfd == -1) {
mqttError("Failed to open socket: ");
mqttCleanup(EXIT_FAILURE, sockfd, NULL);
}
/* Reinitialize the client. */
mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf,
reconnect_state->recvbufsz);
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(client, reconnect_state->client_id, NULL, NULL, 0, reconnect_state->user_name, reconnect_state->password,connect_flags, 400);
/* Subscribe to the topic. */
mqtt_subscribe(client, reconnect_state->topic, 0);
}
\ No newline at end of file
......@@ -125,6 +125,8 @@ int64_t strnatoi(char *num, int32_t len);
char* strreplace(const char* str, const char* pattern, const char* rep);
char *strbetween(char *string, char *begin, char *end);
char *paGetToken(char *src, char **token, int32_t *tokenLen);
void taosMsleep(int32_t mseconds);
......
......@@ -331,6 +331,20 @@ char *strreplace(const char *str, const char *pattern, const char *rep) {
return dest;
}
char *strbetween(char *string, char *begin, char *end) {
char *result = NULL;
char *_begin = strstr(string, begin);
if (_begin != NULL) {
char *_end = strstr(_begin + strlen(begin), end);
int size = _end - _begin;
if (_end != NULL && size > 0) {
result = (char *)calloc(1, size);
memcpy(result, _begin + strlen(begin), size - +strlen(begin));
}
}
return result;
}
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]) {
int32_t i;
char hexval[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
......@@ -691,4 +705,4 @@ void taosRemoveDir(char *rootDir) {
rmdir(rootDir);
uPrint("dir:%s is removed", rootDir);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册