提交 872d8261 编写于 作者: L liu0x54

Merge branch 'develop' into feature/pyconn

......@@ -11,6 +11,7 @@ debs/
rpms/
mac/
*.pyc
.mypy_cache
*.tmp
*.swp
src/connector/nodejs/node_modules/
......@@ -33,5 +34,35 @@ Target/
*.failed
*.sql
sim/
psim/
pysim/
*.out
*DS_Store
# Doxygen Generated files
html/
/.vs
/CMakeFiles/3.10.2
/CMakeCache.txt
/Makefile
/*.cmake
/src/cq/test/CMakeFiles/cqtest.dir/*.cmake
*.cmake
/src/cq/test/CMakeFiles/cqtest.dir/*.make
*.make
link.txt
*.internal
*.includecache
*.marks
Makefile
CMakeError.log
*.log
/CMakeFiles/CMakeRuleHashes.txt
/CMakeFiles/Makefile2
/CMakeFiles/TargetDirectories.txt
/CMakeFiles/cmake.check_cache
/out/isenseconfig/WSL-Clang-Debug
/out/isenseconfig/WSL-GCC-Debug
/test/cfg
/src/.vs
*.o
......@@ -6,6 +6,7 @@
#
matrix:
- os: linux
dist: bionic
language: c
git:
......@@ -25,6 +26,7 @@ matrix:
- python3-pip
- python3-setuptools
- valgrind
- psmisc
before_script:
- cd ${TRAVIS_BUILD_DIR}
......@@ -36,6 +38,7 @@ matrix:
- make > /dev/null
after_success:
- travis_wait 20
- |-
case $TRAVIS_OS_NAME in
linux)
......@@ -46,10 +49,10 @@ matrix:
pip3 install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python3/
cd ${TRAVIS_BUILD_DIR}/tests
./test-all.sh $TRAVIS_EVENT_TYPE || travis_terminate $?
./test-all.sh smoke || travis_terminate $?
cd ${TRAVIS_BUILD_DIR}/tests/pytest
./smoketest.sh -g 2>&1 | tee mem-error-out.txt
./valgrind-test.sh 2>&1 > mem-error-out.log
sleep 1
# Color setting
......@@ -59,12 +62,12 @@ matrix:
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
grep 'ERROR SUMMARY' mem-error-out.txt | uniq | tee uniq-mem-error-out.txt
grep 'start to execute\|ERROR SUMMARY' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-mem-error-out.log
for memError in `cat uniq-mem-error-out.txt | awk '{print $4}'`
for memError in `grep 'ERROR SUMMARY' uniq-mem-error-out.log | awk '{print $4}'`
do
if [ -n "$memError" ]; then
if [ "$memError" -gt 16 ]; then
if [ "$memError" -gt 12 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\
More than our threshold! ## ${NC}"
travis_terminate $memError
......@@ -72,11 +75,11 @@ matrix:
fi
done
grep 'definitely lost' mem-error-out.txt | uniq | tee uniq-definitely-lost-out.txt
for defiMemError in `cat uniq-definitely-lost-out.txt | awk '{print $7}'`
grep 'start to execute\|definitely lost:' mem-error-out.log|grep -v 'grep'|uniq|tee uniq-definitely-lost-out.log
for defiMemError in `grep 'definitely lost:' uniq-definitely-lost-out.log | awk '{print $7}'`
do
if [ -n "$defiMemError" ]; then
if [ "$defiMemError" -gt 16 ]; then
if [ "$defiMemError" -gt 13 ]; then
echo -e "${RED} ## Memory errors number valgrind reports \
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
travis_terminate $defiMemError
......@@ -88,6 +91,7 @@ matrix:
esac
- os: linux
dist: bionic
language: c
compiler: gcc
env: COVERITY_SCAN=true
......@@ -107,7 +111,7 @@ matrix:
description: TDengine
# Where email notification of build analysis results will be sent
notification_email: sdsang@taosdata.com
notification_email: sdsang@taosdata.com, slguan@taosdata.com
# Commands to prepare for build_command
# ** likely specific to your build **
......@@ -115,7 +119,7 @@ matrix:
# The command that will be added as an argument to "cov-build" to compile your project for analysis,
# ** likely specific to your build **
build_command: make > /dev/null
build_command: make
# Pattern to match selecting branches that will run analysis. We recommend leaving this set to 'coverity_scan'.
# Take care in resource usage, and consider the build frequency allowances per
......@@ -123,6 +127,7 @@ matrix:
branch_pattern: coverity_scan
- os: linux
dist: bionic
language: c
compiler: gcc
env: ENV_COVER=true
......@@ -141,6 +146,7 @@ matrix:
- python3-pip
- python3-setuptools
- lcov
- psmisc
before_script:
- cd ${TRAVIS_BUILD_DIR}
......@@ -163,17 +169,16 @@ matrix:
cd ${TRAVIS_BUILD_DIR}/tests
./test-all.sh
./test-all.sh smoke COVER
if [ "$?" -ne "0" ]; then
travis_terminate $?
fi
TEST_RESULT=$?
pkill taosd
sleep 1
cd ${TRAVIS_BUILD_DIR}
lcov -d . --capture --rc lcov_branch_coverage=1 -o coverage.info
lcov --remove coverage.info '*/tests/*' '*/test/*' '*/deps/*' '*/plugins/*' -o coverage.info
lcov -l --rc lcov_branch_coverage=1 coverage.info || travis_terminate $?
gem install coveralls-lcov
......@@ -199,10 +204,37 @@ matrix:
echo -e "${RED} ## Codecov did not collect coverage report! ## ${NC} "
fi
if [ "$TEST_RESULT" -ne "0" ]; then
travis_terminate $?
fi
;;
esac
- os: linux
dist: trusty
language: c
git:
- depth: 1
addons:
apt:
packages:
- build-essential
- cmake
env:
- DESC="trusty/gcc-4.8 build"
before_script:
- cd ${TRAVIS_BUILD_DIR}
- mkdir debug
- cd debug
script:
- cmake .. > /dev/null
- make > /dev/null
- os: linux
dist: bionic
language: c
compiler: clang
env: DESC="linux/clang build"
......
{
"configurations": [
{
"name": "WSL-GCC-Debug",
"generator": "Unix Makefiles",
"configurationType": "Debug",
"buildRoot": "${projectDir}\\build\\",
"installRoot": "${projectDir}\\build\\",
"cmakeExecutable": "/usr/bin/cmake",
"cmakeCommandArgs": "",
"buildCommandArgs": "",
"ctestCommandArgs": "",
"inheritEnvironments": [ "linux_x64" ],
"wslPath": "${defaultWSLPath}",
"addressSanitizerRuntimeFlags": "detect_leaks=0",
"variables": [
{
"name": "CMAKE_INSTALL_PREFIX",
"value": "/mnt/d/TDengine/TDengine/build",
"type": "PATH"
}
]
}
]
}
\ No newline at end of file
......@@ -44,7 +44,7 @@ sudo apt-get install maven
Build TDengine:
```
mkdir build && cd build
mkdir debug && cd debug
cmake .. && cmake --build .
```
......@@ -115,251 +115,6 @@ TDengine provides abundant developing tools for users to develop on TDengine. Fo
- [RESTful API](https://www.taosdata.com/en/documentation/connector/#RESTful-Connector)
- [Node.js](https://www.taosdata.com/en/documentation/connector/#Node.js-Connector)
# How to run the test cases and how to add a new test case?
### Prepare development environment
1. sudo apt install
build-essential cmake net-tools python-pip python-setuptools python3-pip
python3-setuptools valgrind
2. git clone <https://github.com/taosdata/TDengine>; cd TDengine
3. mkdir debug; cd debug; cmake ..; make ; sudo make install
4. pip install src/connector/python/linux/python2 ; pip3 install
src/connector/python/linux/python3
### How to run TSIM test suite
1. cd \<TDengine\>/tests/script
2. sudo ./test.sh
### How to run Python test suite
1. cd \<TDengine\>/tests/pytest
2. ./smoketest.sh \# for smoke test
3. ./smoketest.sh -g \# for memory leak detection test with valgrind
4. ./fulltest.sh \# for full test
> Note1: TDengine daemon's configuration and data files are stored in
> \<TDengine\>/sim directory. As a historical design, it's same place with
> TSIM script. So after the TSIM script ran with sudo privilege, the directory
> has been used by TSIM then the python script cannot write it by a normal
> user. You need to remove the directory completely first before running the
> Python test case. We should consider using two different locations to store
> for TSIM and Python script.
> Note2: if you need to debug crash problem with a core dump, you need
> manually edit smoketest.sh or fulltest.sh to add "ulimit -c unlimited"
> before the script line. Then you can look for the core file in
> \<TDengine\>/tests/pytest after the program crash.
### How to add a new test case
**1. add a new TSIM test cases:**
TSIM test cases are now included in the new development branch and can be
added to the TDengine/tests/script/test.sh script based on the manual test
methods necessary to add test cases as described above.
**2. add a new Python test cases:**
**2.1 Please refer to \<TDengine\>/tests/pytest/insert/basic.py to add a new
test case.** The new test case must implement 3 functions, where self.init()
and self.stop() simply copy the contents of insert/basic.py and the test
logic is implemented in self.run(). You can refer to the code in the util
directory for more information.
**2.2 Edit smoketest.sh to add the path and filename of the new test case**
Note: The Python test framework may continue to be improved in the future,
hopefully, to provide more functionality and ease of writing test cases. The
method of writing the test case above does not exclude that it will also be
affected.
**2.3 What test.py does in detail:**
test.py is the entry program for test case execution and monitoring.
test.py has the following functions.
\-f --file, Specifies the test case file name to be executed
-p --path, Specifies deployment path
\-m --master, Specifies the master server IP for cluster deployment
-c--cluster, test cluster function
-s--stop, terminates all running nodes
\-g--valgrind, load valgrind for memory leak detection test
\-h--help, display help
**2.4 What util/log.py does in detail:**
log.py is quite simple, the main thing is that you can print the output in
different colors as needed. The success() should be called for successful
test case execution and the success() will print green text. The exit() will
print red text and exit the program, exit() should be called for test
failure.
**util/log.py**
...
    def info(self, info):
        printf("%s %s" % (datetime.datetime.now(), info))
 
    def sleep(self, sec):
        printf("%s sleep %d seconds" % (datetime.datetime.now(), sec))
        time.sleep(sec)
 
    def debug(self, err):
        printf("\\033[1;36m%s %s\\033[0m" % (datetime.datetime.now(), err))
 
    def success(self, info):
        printf("\\033[1;32m%s %s\\033[0m" % (datetime.datetime.now(), info))
 
    def notice(self, err):
        printf("\\033[1;33m%s %s\\033[0m" % (datetime.datetime.now(), err))
 
    def exit(self, err):
        printf("\\033[1;31m%s %s\\033[0m" % (datetime.datetime.now(), err))
        sys.exit(1)
 
    def printNoPrefix(self, info):
        printf("\\033[1;36m%s\\033[0m" % (info)
...
**2.5 What util/sql.py does in detail:**
SQL.py is mainly used to execute SQL statements to manipulate the database,
and the code is extracted and commented as follows:
**util/sql.py**
\# prepare() is mainly used to set up the environment for testing table and
data, and to set up the database db for testing. do not call prepare() if you
need to test the database operation command.
def prepare(self):
tdLog.info("prepare database:db")
self.cursor.execute('reset query cache')
self.cursor.execute('drop database if exists db')
self.cursor.execute('create database db')
self.cursor.execute('use db')
...
\# query() is mainly used to execute select statements for normal syntax input
def query(self, sql):
...
\# error() is mainly used to execute the select statement with the wrong syntax
input, the error will be caught as a reasonable behavior, if not caught it will
prove that the test failed
def error()
...
\# checkRows() is used to check the number of returned lines after calling
query(select ...) after calling the query(select ...) to check the number of
rows of returned results.
def checkRows(self, expectRows):
...
\# checkData() is used to check the returned result data after calling
query(select ...) after the query(select ...) is called, failure to meet
expectation is
def checkData(self, row, col, data):
...
\# getData() returns the result data after calling query(select ...) to return
the resulting data after calling query(select ...)
def getData(self, row, col):
...
\# execute() used to execute sql and return the number of affected rows
def execute(self, sql):
...
\# executeTimes() Multiple executions of the same sql statement
def executeTimes(self, sql, times):
...
\# CheckAffectedRows() Check if the number of affected rows is as expected
def checkAffectedRows(self, expectAffectedRows):
...
> Note: Both Python2 and Python3 are currently supported by the Python test
> case. Since Python2 is no longer officially supported by January 1, 2020, it
> is recommended that subsequent test case development be guaranteed to run
> correctly on Python3. For Python2, please consider being compatible if
> appropriate without additional
> burden. <https://nakedsecurity.sophos.com/2020/01/03/python-is-dead-long-live-python/> 
### CI Covenant submission adoption principle.
- Every commit / PR compilation must pass. Currently, the warning is treated
as an error, so the warning must also be resolved.
- Test cases that already exist must pass.
- Because CI is very important to support build and automatically test
procedure, it is necessary to manually test the test case before adding it
and do as many iterations as possible to ensure that the test case provides
stable and reliable test results when added.
> Note: In the future, according to the requirements and test development
> progress will add stress testing, performance testing, code style,
> and other features based on functional testing.
### Third Party Connectors
The TDengine community has also kindly built some of their own connectors! Follow the links below to find the source code for them.
......@@ -367,6 +122,10 @@ The TDengine community has also kindly built some of their own connectors! Follo
- [Rust Connector](https://github.com/taosdata/TDengine/tree/master/tests/examples/rust)
- [.Net Core Connector](https://github.com/maikebing/Maikebing.EntityFrameworkCore.Taos)
# How to run the test cases and how to add a new test case?
TDengine's test framework and all test cases are fully open source.
Please refer to [this document](tests/How-To-Run-Test-And-How-To-Add-New-Test-Case.md) for how to run test and develop new test case.
# TDengine Roadmap
- Support event-driven stream computing
- Support user defined functions
......
......@@ -7,3 +7,4 @@ ADD_SUBDIRECTORY(regex)
ADD_SUBDIRECTORY(iconv)
ADD_SUBDIRECTORY(lz4)
ADD_SUBDIRECTORY(cJson)
ADD_SUBDIRECTORY(MQTT-C)
cmake_minimum_required(VERSION 3.5)
project(MQTT-C VERSION 1.1.2 LANGUAGES C)
# MQTT-C build options
option(MQTT_C_OpenSSL_SUPPORT "Build MQTT-C with OpenSSL support?" OFF)
option(MQTT_C_MbedTLS_SUPPORT "Build MQTT-C with mbed TLS support?" OFF)
option(MQTT_C_EXAMPLES "Build MQTT-C examples?" ON)
option(MQTT_C_TESTS "Build MQTT-C tests?" OFF)
list (APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake)
# MQTT-C library
add_library(mqttc STATIC
src/mqtt_pal.c
src/mqtt.c
)
target_include_directories(mqttc PUBLIC include)
target_link_libraries(mqttc PUBLIC
$<$<C_COMPILER_ID:MSVS>:ws2_32>
)
# Configure with OpenSSL support
if(MQTT_C_OpenSSL_SUPPORT)
find_package(OpenSSL REQUIRED)
target_link_libraries(mqttc INTERFACE OpenSSL::SSL)
target_compile_definitions(mqttc PUBLIC MQTT_USE_BIO)
endif()
# Configure with mbed TLS support
if(MQTT_C_MbedTLS_SUPPORT)
find_package(MbedTLS REQUIRED)
target_include_directories(mqttc PUBLIC ${MBEDTLS_INCLUDE_DIRS})
target_link_libraries(mqttc INTERFACE ${MBEDTLS_LIBRARY})
target_compile_definitions(mqttc PUBLIC MQTT_USE_MBEDTLS)
endif()
# Build examples
if(MQTT_C_EXAMPLES)
find_package(Threads REQUIRED)
if(MQTT_C_OpenSSL_SUPPORT)
add_executable(bio_publisher examples/bio_publisher.c)
target_link_libraries(bio_publisher Threads::Threads mqttc)
add_executable(openssl_publisher examples/openssl_publisher.c)
target_link_libraries(openssl_publisher Threads::Threads mqttc)
elseif(MQTT_C_MbedTLS_SUPPORT)
add_executable(mbedtls_publisher examples/mbedtls_publisher.c)
target_link_libraries(mbedtls_publisher Threads::Threads mqttc ${MBEDX509_LIBRARY} ${MBEDCRYPTO_LIBRARY})
else()
add_executable(simple_publisher examples/simple_publisher.c)
target_link_libraries(simple_publisher Threads::Threads mqttc)
add_executable(simple_subscriber examples/simple_subscriber.c)
target_link_libraries(simple_subscriber Threads::Threads mqttc)
add_executable(reconnect_subscriber examples/reconnect_subscriber.c)
target_link_libraries(reconnect_subscriber Threads::Threads mqttc)
endif()
endif()
# Build tests
if(MQTT_C_TESTS)
find_path(CMOCKA_INCLUDE_DIR cmocka.h)
find_library(CMOCKA_LIBRARY cmocka)
if((NOT CMOCKA_INCLUDE_DIR) OR (NOT CMOCKA_LIBRARY))
message(FATAL_ERROR "Failed to find cmocka! Add cmocka's install prefix to CMAKE_PREFIX_PATH to resolve this error.")
endif()
add_executable(tests tests.c)
target_link_libraries(tests ${CMOCKA_LIBRARY} mqttc)
target_include_directories(tests PRIVATE ${CMOCKA_INCLUDE_DIR})
endif()
# Install includes and library
# install(TARGETS mqttc
# DESTINATION lib
# )
# install(DIRECTORY include/
# DESTINATION include)
此差异已折叠。
MIT License
Copyright (c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
<p align="right">
<a href="https://github.com/LiamBindle/MQTT-C/stargazers"><img src="https://img.shields.io/github/stars/LiamBindle/MQTT-C.svg?style=social&label=Star" style="margin-left:5em"></a>
<a href="https://github.com/LiamBindle/MQTT-C/network/members"><img src="https://img.shields.io/github/forks/LiamBindle/MQTT-C.svg?style=social&label=Fork"></a>
</p>
<p align="center">
<img width="70%" src="docs/mqtt-c-logo.png"><br>
<a href="https://liambindle.ca/MQTT-C"><img src="https://img.shields.io/badge/docs-passing-brightgreen.svg"></a>
<a href="https://github.com/LiamBindle/MQTT-C/issues"><img src="https://img.shields.io/badge/Maintained%3F-yes-green.svg"></a>
<a href="https://GitHub.com/LiamBindle/MQTT-C/issues/"><img src="https://img.shields.io/github/issues/LiamBindle/MQTT-C.svg"></a>
<a href="https://github.com/LiamBindle/MQTT-C/issues"><img src="https://img.shields.io/github/issues-closed/LiamBindle/MQTT-C.svg"></a>
<a href="https://github.com/LiamBindle/MQTT-C/blob/master/LICENSE"><img src="https://img.shields.io/badge/License-MIT-blue.svg"></a>
</p>
#
MQTT-C is an [MQTT v3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
client written in C. MQTT is a lightweight publisher-subscriber-based messaging protocol that is
commonly used in IoT and networking applications where high-latency and low data-rate links
are expected. The purpose of MQTT-C is to provide a **portable** MQTT client, **written in C**,
for embedded systems and PC's alike. MQTT-C does this by providing a transparent Platform
Abstraction Layer (PAL) which makes porting to new platforms easy. MQTT-C is completely
thread-safe but can also run perfectly fine on single-threaded systems making MQTT-C
well-suited for embedded systems and microcontrollers. Finally, MQTT-C is small; there are only
two source files totalling less than 2000 lines.
#### A note from the author
It's been great to hear about all the places MQTT-C is being used! Please don't hesitate
to get in touch with me or submit issues on GitHub!
## Getting Started
To use MQTT-C you first instantiate a `struct mqtt_client` and initialize it by calling
@ref mqtt_init.
```c
struct mqtt_client client; /* instantiate the client */
mqtt_init(&client, ...); /* initialize the client */
```
Once your client is initialized you need to connect to an MQTT broker.
```c
mqtt_connect(&client, ...); /* send a connection request to the broker. */
```
At this point the client is ready to use! For example, we can subscribe to a topic like so:
```c
/* subscribe to "toaster/temperature" with a max QoS level of 0 */
mqtt_subscribe(&client, "toaster/temperature", 0);
```
And we can publish to a topic like so:
```c
/* publish coffee temperature with a QoS level of 1 */
int temperature = 67;
mqtt_publish(&client, "coffee/temperature", &temperature, sizeof(int), MQTT_PUBLISH_QOS_1);
```
Those are the basics! From here the [examples](https://github.com/LiamBindle/MQTT-C/tree/master/examples) and [API documentation](https://liambindle.ca/MQTT-C/group__api.html) are good places to get started.
## Building
There are **only two source files** that need to be built, `mqtt.c` and `mqtt_pal.c`.
These files are ANSI C (C89) compatible, and should compile with any C compiler.
Then, simply <code>\#include <mqtt.h></code>.
Alternatively, you can build MQTT-C with CMake or the provided Makefile. These are provided for convenience.
## Documentation
Pre-built documentation can be found here: [https://liambindle.ca/MQTT-C](https://liambindle.ca/MQTT-C). Be sure to check out the [examples](https://github.com/LiamBindle/MQTT-C/tree/master/examples) too.
The @ref api documentation contains all the documentation application programmers should need.
The @ref pal documentation contains everything you should need to port MQTT-C to a new platform,
and the other modules contain documentation for MQTT-C developers.
## Testing and Building the Tests
The MQTT-C unit tests use the [cmocka unit testing framework](https://cmocka.org/).
Therefore, [cmocka](https://cmocka.org/) *must* be installed on your machine to build and run
the unit tests. For convenience, a simple `"makefile"` is included to build the unit tests and
examples on UNIX-like machines. The unit tests and examples can be built as follows:
```bash
$ make all
```
The unit tests and examples will be built in the `"bin/"` directory. The unit tests can be run
like so:
```bash
$ ./bin/tests [address [port]]
```
Note that the \c address and \c port arguments are both optional to specify the location of the
MQTT broker that is to be used for the tests. If no \c address is given then the
[Mosquitto MQTT Test Server](https://test.mosquitto.org/) will be used. If no \c port is given,
port 1883 will be used.
## Portability
MQTT-C provides a transparent platform abstraction layer (PAL) in `mqtt_pal.h` and `mqtt_pal.c`.
These files declare and implement the types and calls that MQTT-C requires. Refer to
@ref pal for the complete documentation of the PAL.
## Contributing
Please feel free to submit issues and pull-requests [here](https://github.com/LiamBindle/MQTT-C).
When submitting a pull-request please ensure you have *fully documented* your changes and
added the appropriate unit tests.
## License
This project is licensed under the [MIT License](https://opensource.org/licenses/MIT). See the
`"LICENSE"` file for more details.
## Authors
MQTT-C was initially developed as a CMPT 434 (Winter Term, 2018) final project at the University of
Saskatchewan by:
- **Liam Bindle**
- **Demilade Adeoye**
/**
* @file
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/bio_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* Load OpenSSL */
SSL_load_error_strings();
ERR_load_BIO_strings();
OpenSSL_add_all_algorithms();
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
BIO* sockfd = open_nb_socket(addr, port);
if (sockfd == NULL) {
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
{
if (sockfd != NULL) BIO_free_all(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/mbedtls_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
const char* ca_file;
struct mbedtls_context ctx;
mqtt_pal_socket_handle sockfd;
if (argc > 1) {
ca_file = argv[1];
} else {
printf("error: path to the CA certificate to use\n");
exit(1);
}
/* get address (argv[2] if present) */
if (argc > 2) {
addr = argv[2];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[3] if present) */
if (argc > 3) {
port = argv[3];
} else {
port = "8883";
}
/* get the topic name to publish */
if (argc > 4) {
topic = argv[4];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
open_nb_socket(&ctx, addr, port, ca_file);
sockfd = &ctx.ssl_ctx;
if (sockfd == NULL) {
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_daemon)
{
if (client_daemon != NULL) pthread_cancel(*client_daemon);
mbedtls_ssl_free(sockfd);
/* XXX free the rest of contexts */
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
/**
* @file
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/openssl_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
const char* ca_file;
/* Load OpenSSL */
SSL_load_error_strings();
ERR_load_BIO_strings();
OpenSSL_add_all_algorithms();
SSL_library_init();
SSL_CTX* ssl_ctx;
BIO* sockfd;
if (argc > 1) {
ca_file = argv[1];
} else {
printf("error: path to the CA certificate to use\n");
exit(1);
}
/* get address (argv[2] if present) */
if (argc > 2) {
addr = argv[2];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[3] if present) */
if (argc > 3) {
port = argv[3];
} else {
port = "8883";
}
/* get the topic name to publish */
if (argc > 4) {
topic = argv[4];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
open_nb_socket(&sockfd, &ssl_ctx, addr, port, ca_file, NULL);
if (sockfd == NULL) {
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_2);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)
{
if (sockfd != NULL) BIO_free_all(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
* A simple subscriber program that performs automatic reconnections.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/posix_sockets.h"
/**
* @brief A structure that I will use to keep track of some data needed
* to setup the connection to the broker.
*
* An instance of this struct will be created in my \c main(). Then, whenever
* \ref reconnect_client is called, this instance will be passed.
*/
struct reconnect_state_t {
const char* hostname;
const char* port;
const char* topic;
uint8_t* sendbuf;
size_t sendbufsz;
uint8_t* recvbuf;
size_t recvbufsz;
};
/**
* @brief My reconnect callback. It will reestablish the connection whenever
* an error occurs.
*/
void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr);
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, int sockfd, pthread_t *client_daemon);
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* build the reconnect_state structure which will be passed to reconnect */
struct reconnect_state_t reconnect_state;
reconnect_state.hostname = addr;
reconnect_state.port = port;
reconnect_state.topic = topic;
uint8_t sendbuf[2048];
uint8_t recvbuf[1024];
reconnect_state.sendbuf = sendbuf;
reconnect_state.sendbufsz = sizeof(sendbuf);
reconnect_state.recvbuf = recvbuf;
reconnect_state.recvbufsz = sizeof(recvbuf);
/* setup a client */
struct mqtt_client client;
mqtt_init_reconnect(&client,
reconnect_client, &reconnect_state,
publish_callback
);
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, -1, NULL);
}
/* start publishing the time */
printf("%s listening for '%s' messages.\n", argv[0], topic);
printf("Press ENTER to inject an error.\n");
printf("Press CTRL-D to exit.\n\n");
/* block */
while(fgetc(stdin) != EOF) {
printf("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\"\n");
client.error = MQTT_ERROR_SOCKET_ERROR;
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, client.socketfd, &client_daemon);
}
void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr)
{
struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr);
/* Close the clients socket if this isn't the initial reconnect call */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd);
}
/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
printf("reconnect_client: called while client was in error state \"%s\"\n",
mqtt_error_str(client->error)
);
}
/* Open a new socket. */
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* Reinitialize the client. */
mqtt_reinit(client, sockfd,
reconnect_state->sendbuf, reconnect_state->sendbufsz,
reconnect_state->recvbuf, reconnect_state->recvbufsz
);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
/* Subscribe to the topic. */
mqtt_subscribe(client, reconnect_state->topic, 0);
}
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*) malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';
printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
free(topic_name);
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/posix_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, int sockfd, pthread_t *client_daemon);
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
int sockfd = open_nb_socket(addr, port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(&client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_0);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/**
* @file
* A simple program that subscribes to a topic.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <mqtt.h>
#include "templates/posix_sockets.h"
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, int sockfd, pthread_t *client_daemon);
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
int sockfd = open_nb_socket(addr, port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(&client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* subscribe */
mqtt_subscribe(&client, topic, 0);
/* start publishing the time */
printf("%s listening for '%s' messages.\n", argv[0], topic);
printf("Press CTRL-D to exit.\n\n");
/* block */
while(fgetc(stdin) != EOF);
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*) malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';
printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);
free(topic_name);
}
void* client_refresher(void* client)
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
#ifndef __BIO_SOCKET_TEMPLATE_H__
#define __BIO_SOCKET_TEMPLATE_H__
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
/*
A template for opening a non-blocking BIO socket.
*/
BIO* open_nb_socket(const char* addr, const char* port) {
BIO* bio = BIO_new_connect(addr);
BIO_set_nbio(bio, 1);
BIO_set_conn_port(bio, port);
/* timeout after 10 seconds */
int start_time = time(NULL);
while(BIO_do_connect(bio) == 0 && (int)time(NULL) - start_time < 10);
if (BIO_do_connect(bio) <= 0) {
fprintf(stderr, "Failed to open socket: BIO_do_connect returned <= 0\n");
return NULL;
}
return bio;
}
#endif
\ No newline at end of file
#ifndef __MBEDTLS_SOCKET_TEMPLATE_H__
#define __MBEDTLS_SOCKET_TEMPLATE_H__
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mbedtls/error.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>
#include <mbedtls/net_sockets.h>
#include <mbedtls/ssl.h>
#if !defined(MBEDTLS_NET_POLL_READ)
/* compat for older mbedtls */
#define MBEDTLS_NET_POLL_READ 1
#define MBEDTLS_NET_POLL_WRITE 1
int
mbedtls_net_poll(mbedtls_net_context * ctx, uint32_t rw, uint32_t timeout)
{
/* XXX this is not ideal but good enough for an example */
usleep(300);
return 1;
}
#endif
struct mbedtls_context {
mbedtls_net_context net_ctx;
mbedtls_ssl_context ssl_ctx;
mbedtls_ssl_config ssl_conf;
mbedtls_x509_crt ca_crt;
mbedtls_entropy_context entropy;
mbedtls_ctr_drbg_context ctr_drbg;
};
void failed(const char *fn, int rv) {
char buf[100];
mbedtls_strerror(rv, buf, sizeof(buf));
printf("%s failed with %x (%s)\n", fn, -rv, buf);
exit(1);
}
void cert_verify_failed(uint32_t rv) {
char buf[512];
mbedtls_x509_crt_verify_info(buf, sizeof(buf), "\t", rv);
printf("Certificate verification failed (%0" PRIx32 ")\n%s\n", rv, buf);
exit(1);
}
/*
A template for opening a non-blocking mbed TLS connection.
*/
void open_nb_socket(struct mbedtls_context *ctx,
const char *hostname,
const char *port,
const char *ca_file) {
const unsigned char *additional = (const unsigned char *)"MQTT-C";
size_t additional_len = 6;
int rv;
mbedtls_net_context *net_ctx = &ctx->net_ctx;
mbedtls_ssl_context *ssl_ctx = &ctx->ssl_ctx;
mbedtls_ssl_config *ssl_conf = &ctx->ssl_conf;
mbedtls_x509_crt *ca_crt = &ctx->ca_crt;
mbedtls_entropy_context *entropy = &ctx->entropy;
mbedtls_ctr_drbg_context *ctr_drbg = &ctx->ctr_drbg;
mbedtls_entropy_init(entropy);
mbedtls_ctr_drbg_init(ctr_drbg);
rv = mbedtls_ctr_drbg_seed(ctr_drbg, mbedtls_entropy_func, entropy,
additional, additional_len);
if (rv != 0) {
failed("mbedtls_ctr_drbg_seed", rv);
}
mbedtls_x509_crt_init(ca_crt);
rv = mbedtls_x509_crt_parse_file(ca_crt, ca_file);
if (rv != 0) {
failed("mbedtls_x509_crt_parse_file", rv);
}
mbedtls_ssl_config_init(ssl_conf);
rv = mbedtls_ssl_config_defaults(ssl_conf, MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT);
if (rv != 0) {
failed("mbedtls_ssl_config_defaults", rv);
}
mbedtls_ssl_conf_ca_chain(ssl_conf, ca_crt, NULL);
mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
mbedtls_ssl_conf_rng(ssl_conf, mbedtls_ctr_drbg_random, ctr_drbg);
mbedtls_net_init(net_ctx);
rv = mbedtls_net_connect(net_ctx, hostname, port, MBEDTLS_NET_PROTO_TCP);
if (rv != 0) {
failed("mbedtls_net_connect", rv);
}
rv = mbedtls_net_set_nonblock(net_ctx);
if (rv != 0) {
failed("mbedtls_net_set_nonblock", rv);
}
mbedtls_ssl_init(ssl_ctx);
rv = mbedtls_ssl_setup(ssl_ctx, ssl_conf);
if (rv != 0) {
failed("mbedtls_ssl_setup", rv);
}
rv = mbedtls_ssl_set_hostname(ssl_ctx, hostname);
if (rv != 0) {
failed("mbedtls_ssl_set_hostname", rv);
}
mbedtls_ssl_set_bio(ssl_ctx, net_ctx,
mbedtls_net_send, mbedtls_net_recv, NULL);
for (;;) {
rv = mbedtls_ssl_handshake(ssl_ctx);
uint32_t want = 0;
if (rv == MBEDTLS_ERR_SSL_WANT_READ) {
want |= MBEDTLS_NET_POLL_READ;
} else if (rv == MBEDTLS_ERR_SSL_WANT_WRITE) {
want |= MBEDTLS_NET_POLL_WRITE;
} else {
break;
}
rv = mbedtls_net_poll(net_ctx, want, -1);
if (rv < 0) {
failed("mbedtls_net_poll", rv);
}
}
if (rv != 0) {
failed("mbedtls_ssl_handshake", rv);
}
uint32_t result = mbedtls_ssl_get_verify_result(ssl_ctx);
if (result != 0) {
if (result == (uint32_t)-1) {
failed("mbedtls_ssl_get_verify_result", result);
} else {
cert_verify_failed(result);
}
}
}
#endif
#ifndef __OPENSSL_SOCKET_TEMPLATE_H__
#define __OPENSSL_SOCKET_TEMPLATE_H__
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
/*
A template for opening a non-blocking OpenSSL connection.
*/
void open_nb_socket(BIO** bio, SSL_CTX** ssl_ctx, const char* addr, const char* port, const char* ca_file, const char* ca_path) {
*ssl_ctx = SSL_CTX_new(SSLv23_client_method());
SSL* ssl;
/* load certificate */
if (!SSL_CTX_load_verify_locations(*ssl_ctx, ca_file, ca_path)) {
printf("error: failed to load certificate\n");
exit(1);
}
/* open BIO socket */
*bio = BIO_new_ssl_connect(*ssl_ctx);
BIO_get_ssl(*bio, &ssl);
SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
BIO_set_conn_hostname(*bio, addr);
BIO_set_nbio(*bio, 1);
BIO_set_conn_port(*bio, port);
/* wait for connect with 10 second timeout */
int start_time = time(NULL);
int do_connect_rv = BIO_do_connect(*bio);
while(do_connect_rv <= 0 && BIO_should_retry(*bio) && (int)time(NULL) - start_time < 10) {
do_connect_rv = BIO_do_connect(*bio);
}
if (do_connect_rv <= 0) {
printf("error: %s\n", ERR_reason_error_string(ERR_get_error()));
BIO_free_all(*bio);
SSL_CTX_free(*ssl_ctx);
*bio = NULL;
*ssl_ctx=NULL;
return;
}
/* verify certificate */
if (SSL_get_verify_result(ssl) != X509_V_OK) {
/* Handle the failed verification */
printf("error: x509 certificate verification failed\n");
exit(1);
}
}
#endif
\ No newline at end of file
#ifndef __POSIX_SOCKET_TEMPLATE_H__
#define __POSIX_SOCKET_TEMPLATE_H__
#include <stdio.h>
#include <sys/types.h>
#if !defined(WIN32)
#include <sys/socket.h>
#include <netdb.h>
#endif
#include <fcntl.h>
/*
A template for opening a non-blocking POSIX socket.
*/
int open_nb_socket(const char* addr, const char* port) {
struct addrinfo hints = {0};
hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Must be TCP */
int sockfd = -1;
int rv;
struct addrinfo *p, *servinfo;
/* get address information */
rv = getaddrinfo(addr, port, &hints, &servinfo);
if(rv != 0) {
fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerror(rv));
return -1;
}
/* open the first possible socket */
for(p = servinfo; p != NULL; p = p->ai_next) {
sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (sockfd == -1) continue;
/* connect to server */
rv = connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen);
if(rv == -1) continue;
break;
}
/* free servinfo */
freeaddrinfo(servinfo);
/* make non-blocking */
#if !defined(WIN32)
if (sockfd != -1) fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
#else
if (sockfd != INVALID_SOCKET) {
int iMode = 1;
ioctlsocket(sockfd, FIONBIO, &iMode);
}
#endif
/* return the new socket fd */
return sockfd;
}
#endif
\ No newline at end of file
此差异已折叠。
#ifndef __MQTT_PAL_H__
#define __MQTT_PAL_H__
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/**
* @file
* @brief Includes/supports the types/calls required by the MQTT-C client.
*
* @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore
* responsible for including/supporting all the required types and calls.
*
* @defgroup pal Platform abstraction layer
* @brief Documentation of the types and calls required to port MQTT-C to a new platform.
*
* mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a
* new platform the following types, functions, constants, and macros must be defined in
* mqtt_pal.h:
* - Types:
* - \c size_t, \c ssize_t
* - \c uint8_t, \c uint16_t, \c uint32_t
* - \c va_list
* - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME()
* - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and
* \c MQTT_PAL_MUTEX_RELEASE
* - Functions:
* - \c memcpy, \c strlen
* - \c va_start, \c va_arg, \c va_end
* - Constants:
* - \c INT_MIN
*
* Additionally, three macro's are required:
* - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t.
* - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t.
* - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds.
* - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer.
* - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by
* \c mtx_pointer.
*
* Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c
* for sending and receiving data using the platforms socket calls.
*/
/* UNIX-like platform support */
#if defined(__unix__) || defined(__APPLE__)
#include <limits.h>
#include <string.h>
#include <stdarg.h>
#include <time.h>
#include <arpa/inet.h>
#include <pthread.h>
#define MQTT_PAL_HTONS(s) htons(s)
#define MQTT_PAL_NTOHS(s) ntohs(s)
#define MQTT_PAL_TIME() time(NULL)
typedef time_t mqtt_pal_time_t;
typedef pthread_mutex_t mqtt_pal_mutex_t;
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL)
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr)
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr)
#ifndef MQTT_USE_CUSTOM_SOCKET_HANDLE
#ifdef MQTT_USE_MBEDTLS
struct mbedtls_ssl_context;
typedef struct mbedtls_ssl_context *mqtt_pal_socket_handle;
#elif defined(MQTT_USE_BIO)
#include <openssl/bio.h>
typedef BIO* mqtt_pal_socket_handle;
#else
typedef int mqtt_pal_socket_handle;
#endif
#endif
#elif defined(_MSC_VER)
#include <limits.h>
#include <windows.h>
#include <time.h>
#include <stdint.h>
#include <winsock2.h>
typedef SSIZE_T ssize_t;
#define MQTT_PAL_HTONS(s) htons(s)
#define MQTT_PAL_NTOHS(s) ntohs(s)
#define MQTT_PAL_TIME() time(NULL)
typedef time_t mqtt_pal_time_t;
typedef CRITICAL_SECTION mqtt_pal_mutex_t;
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) InitializeCriticalSection(mtx_ptr)
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) EnterCriticalSection(mtx_ptr)
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) LeaveCriticalSection(mtx_ptr)
#ifndef MQTT_USE_CUSTOM_SOCKET_HANDLE
#ifdef MQTT_USE_BIO
#include <openssl/bio.h>
typedef BIO* mqtt_pal_socket_handle;
#else
typedef SOCKET mqtt_pal_socket_handle;
#endif
#endif
#endif
/**
* @brief Sends all the bytes in a buffer.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the first byte in the buffer to send.
* @param[in] len The number of bytes to send (starting at \p buf).
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags);
/**
* @brief Non-blocking receive all the byte available.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the receive buffer.
* @param[in] bufsz The max number of bytes that can be put into \p buf.
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes received if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags);
#endif
此差异已折叠。
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include <mqtt.h>
/**
* @file
* @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and
* any platform-specific helpers you'd like.
* @cond Doxygen_Suppress
*/
#ifdef MQTT_USE_MBEDTLS
#include <mbedtls/ssl.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
int rv = mbedtls_ssl_write(fd, buf + sent, len - sent);
if (rv < 0) {
if (rv == MBEDTLS_ERR_SSL_WANT_READ ||
rv == MBEDTLS_ERR_SSL_WANT_WRITE
#if defined(MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS
#endif
#if defined(MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS
#endif
) {
/* should call mbedtls_ssl_writer later again */
break;
}
return MQTT_ERROR_SOCKET_ERROR;
}
sent += (size_t) rv;
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void *const start = buf;
int rv;
do {
rv = mbedtls_ssl_read(fd, buf, bufsz);
if (rv < 0) {
if (rv == MBEDTLS_ERR_SSL_WANT_READ ||
rv == MBEDTLS_ERR_SSL_WANT_WRITE
#if defined(MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_ASYNC_IN_PROGRESS
#endif
#if defined(MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS)
|| rv == MBEDTLS_ERR_SSL_CRYPTO_IN_PROGRESS
#endif
) {
/* should call mbedtls_ssl_read later again */
break;
}
return MQTT_ERROR_SOCKET_ERROR;
}
buf = (char*)buf + rv;
bufsz -= rv;
} while (rv > 0);
return buf - start;
}
#elif defined(MQTT_USE_BIO)
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
int tmp = BIO_write(fd, buf + sent, len - sent);
if (tmp > 0) {
sent += (size_t) tmp;
} else if (tmp <= 0 && !BIO_should_retry(fd)) {
return MQTT_ERROR_SOCKET_ERROR;
}
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void *const start = buf;
int rv;
do {
rv = BIO_read(fd, buf, bufsz);
if (rv > 0) {
/* successfully read bytes from the socket */
buf += rv;
bufsz -= rv;
} else if (!BIO_should_retry(fd)) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
} while (!BIO_should_read(fd));
return (ssize_t)(buf - start);
}
#elif defined(__unix__) || defined(__APPLE__)
#include <errno.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
ssize_t tmp = send(fd, buf + sent, len - sent, flags);
if (tmp < 1) {
return MQTT_ERROR_SOCKET_ERROR;
}
sent += (size_t) tmp;
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void *const start = buf;
ssize_t rv;
do {
rv = recv(fd, buf, bufsz, flags);
if (rv > 0) {
/* successfully read bytes from the socket */
buf += rv;
bufsz -= rv;
} else if (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
} while (rv > 0);
return buf - start;
}
#elif defined(_MSC_VER)
#include <errno.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
ssize_t tmp = send(fd, (char*)buf + sent, len - sent, flags);
if (tmp < 1) {
return MQTT_ERROR_SOCKET_ERROR;
}
sent += (size_t) tmp;
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const char *const start = buf;
ssize_t rv;
do {
rv = recv(fd, buf, bufsz, flags);
if (rv > 0) {
/* successfully read bytes from the socket */
buf = (char*)buf + rv;
bufsz -= rv;
} else if (rv < 0) {
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
}
} while (rv > 0);
return (ssize_t)((char*)buf - start);
}
#else
#error No PAL!
#endif
/** @endcond */
此差异已折叠。
......@@ -2,6 +2,8 @@
TDengine provides many connectors for development, including C/C++, JAVA, Python, RESTful, Go, Node.JS, etc.
NOTE: All APIs which require a SQL string as parameter, including but not limit to `taos_query`, `taos_query_a`, `taos_subscribe` in the C/C++ Connector and their counterparts in other connectors, can ONLY process one SQL statement at a time. If more than one SQL statements are provided, their behaviors are undefined.
## C/C++ API
C/C++ APIs are similar to the MySQL APIs. Applications should include TDengine head file _taos.h_ to use C/C++ APIs by adding the following line in code:
......
......@@ -53,10 +53,11 @@ STable从属于库,一个STable只属于一个库,但一个库可以有一
说明:
1. TAGS列总长度不能超过512 bytes;
1. TAGS列总长度不能超过64k bytes;
2. TAGS列的数据类型不能是timestamp;
3. TAGS列名不能与其他列名相同;
4. TAGS列名不能为预留关键字.
5. TAGS总数的上限是128.
- 显示已创建的超级表
......@@ -114,7 +115,7 @@ INSERT INTO <tb1_name> USING <stb1_name> TAGS (<tag1_value1>, ...) VALUES (<fiel
ALTER TABLE <stable_name> ADD TAG <new_tag_name> <TYPE>
```
为STable增加一个新的标签,并指定新标签的类型。标签总数不能超过6个。
为STable增加一个新的标签,并指定新标签的类型。标签总数不能超过128个。
- 删除标签
......@@ -202,7 +203,7 @@ INSERT INTO therm4 VALUES ('2018-01-01 00:00:00.000', 23);
###3:按标签聚合查询
查询位于北京(beijing)和天津(tianjing)两个地区的温度传感器采样值的数量count(*)、平均温度avg(degree)、最高温度max(degree)、最低温度min(degree),并将结果按所处地域(location)和传感器类型(type)进行聚合。
查询位于北京(beijing)和天津(tianjin)两个地区的温度传感器采样值的数量count(*)、平均温度avg(degree)、最高温度max(degree)、最低温度min(degree),并将结果按所处地域(location)和传感器类型(type)进行聚合。
```mysql
SELECT COUNT(*), AVG(degree), MAX(degree), MIN(degree)
......
......@@ -22,11 +22,11 @@ New keyword "tags" is introduced, where tag_name is the tag name, and tag_type i
Note:
1. The bytes of all tags together shall be less than 512
1. The bytes of all tags together shall be less than 64k
2. Tag's data type can not be time stamp
3. Tag name shall be different from the field name
4. Tag name shall not be the same as system keywords
5. Maximum number of tags is 6
5. Maximum number of tags is 128
For example:
......@@ -168,7 +168,7 @@ You can add, delete and change the tags for a STable, and you can change the tag
ALTER TABLE <stable_name> ADD TAG <new_tag_name> <TYPE>
```
It adds a new tag to the STable with a data type. The maximum number of tags is 6.
It adds a new tag to the STable with a data type. The maximum number of tags is 128.
### Drop a Tag
......
......@@ -63,11 +63,11 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
| 3 | BIGINT | 8 | 长整型,范围 [-2^63+1, 2^63-1], -2^63用于NULL |
| 4 | FLOAT | 4 | 浮点型,有效位数6-7,范围 [-3.4E38, 3.4E38] |
| 5 | DOUBLE | 8 | 双精度浮点型,有效位数15-16,范围 [-1.7E308, 1.7E308] |
| 6 | BINARY | 自定义 | 用于记录字符串,最长不能超过504 bytes。binary仅支持字符串输入,字符串两端使用单引号引用,否则英文全部自动转化为小写。使用时须指定大小,如binary(20)定义了最长为20个字符的字符串,每个字符占1byte的存储空间。如果用户字符串超出20字节,将被自动截断。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示, 即 **\’**。 |
| 6 | BINARY | 自定义 | 用于记录字符串,理论上,最长可以有65526字节,但由于每行数据最多64K字节,实际上限一般小于理论值。 binary仅支持字符串输入,字符串两端使用单引号引用,否则英文全部自动转化为小写。使用时须指定大小,如binary(20)定义了最长为20个字符的字符串,每个字符占1byte的存储空间。如果用户字符串超出20字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示, 即 **\’**。 |
| 7 | SMALLINT | 2 | 短整型, 范围 [-32767, 32767], -32768用于NULL |
| 8 | TINYINT | 1 | 单字节整型,范围 [-127, 127], -128用于NULL |
| 9 | BOOL | 1 | 布尔型,{true, false} |
| 10 | NCHAR | 自定义 | 用于记录非ASCII字符串,如中文字符。每个nchar字符占用4bytes的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 **\’**。nchar使用时须指定字符串大小,类型为nchar(10)的列表示此列的字符串最多存储10个nchar字符,会固定占用40bytes的空间。如用户字符串长度超出声明长度,则将被自动截断。 |
| 10 | NCHAR | 自定义 | 用于记录非ASCII字符串,如中文字符。每个nchar字符占用4bytes的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 **\’**。nchar使用时须指定字符串大小,类型为nchar(10)的列表示此列的字符串最多存储10个nchar字符,会固定占用40bytes的空间。如用户字符串长度超出声明长度,则将会报错。 |
**Tips**: TDengine对SQL语句中的英文字符不区分大小写,自动转化为小写执行。因此用户大小写敏感的字符串及密码,需要使用单引号将字符串引起来。
......@@ -106,7 +106,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```mysql
CREATE TABLE [IF NOT EXISTS] tb_name (timestamp_field_name TIMESTAMP, field1_name data_type1 [, field2_name data_type2 ...])
```
说明:1)表的第一个字段必须是TIMESTAMP,并且系统自动将其设为主键;2)表的每行长度不能超过4096字节;3)使用数据类型binary或nchar,需指定其最长的字节数,如binary(20),表示20字节。
说明:1)表的第一个字段必须是TIMESTAMP,并且系统自动将其设为主键;2)表的每行长度不能超过64K字节;3)使用数据类型binary或nchar,需指定其最长的字节数,如binary(20),表示20字节。
- **删除数据表**
......@@ -402,7 +402,7 @@ count(tbname) |
SELECT * FROM tb1 WHERE ts >= NOW - 1h
```
- 查询表tb1从2018-06-01 08:00:00.000 到2018-06-02 08:00:00.000时间范围,并且clo3的字符串是'nny'结尾的记录,结果按照时间戳降序
- 查询表tb1从2018-06-01 08:00:00.000 到2018-06-02 08:00:00.000时间范围,并且col3的字符串是'nny'结尾的记录,结果按照时间戳降序
```mysql
SELECT * FROM tb1 WHERE ts > '2018-06-01 08:00:00.000' AND ts <= '2018-06-02 08:00:00.000' AND col3 LIKE '%nny' ORDER BY ts DESC
......
......@@ -39,8 +39,8 @@ The full list of data types is listed below. For string types of data, we will
| 6 | DOUBLE | 8 | A standard nullable double float type with 15-16 significant digits and a range of [-1.7E308, 1.7E308]​ |
| 7 | BOOL | 1 | A nullable boolean type, [**`true`**, **`false`**] |
| 8 | TIMESTAMP | 8 | A nullable timestamp type with the same usage as the primary column timestamp |
| 9 | BINARY(*M*) | *M* | A nullable string type whose length is *M*, any exceeded chars will be automatically truncated. This type of string only supports ASCii encoded chars. |
| 10 | NCHAR(*M*) | 4 * *M* | A nullable string type whose length is *M*, any exceeded chars will be truncated. The **`NCHAR`** type supports Unicode encoded chars. |
| 9 | BINARY(*M*) | *M* | A nullable string type whose length is *M*, error should be threw with exceeded chars, the maximum length of *M* is 65526, but as maximum row size is 64K bytes, the actual upper limit will generally less than 65526. This type of string only supports ASCii encoded chars. |
| 10 | NCHAR(*M*) | 4 * *M* | A nullable string type whose length is *M*, error should be threw with exceeded chars. The **`NCHAR`** type supports Unicode encoded chars. |
All the keywords in a SQL statement are case-insensitive, but strings values are case-sensitive and must be quoted by a pair of `'` or `"`. To quote a `'` or a `"` , you can use the escape character `\`.
......@@ -86,7 +86,7 @@ All the keywords in a SQL statement are case-insensitive, but strings values are
1) The first column must be a `timestamp`, and the system will set it as the primary key.
2) The record size is limited to 4096 bytes
2) The record size is limited to 64k bytes
3) For `binary` or `nchar` data types, the length must be specified. For example, binary(20) means a binary data type with 20 bytes.
......
......@@ -2,6 +2,8 @@
TDengine提供了丰富的应用程序开发接口,其中包括C/C++、JAVA、Python、RESTful、Go等,便于用户快速开发应用。
注意:所有执行 SQL 语句的 API,例如 C/C++ Connector 中的 `tao_query``taos_query_a``taos_subscribe` 等,以及其它语言中与它们对应的API,每次都只能执行一条 SQL 语句,如果实际参数中包含了多条语句,它们的行为是未定义的。
## C/C++ Connector
C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine头文件 _taos.h_(安装后,位于 _/usr/local/taos/include_):
......@@ -1207,11 +1209,49 @@ TDengine在Window系统上提供的API与Linux系统是相同的, 应用程序
其中,最常用的文件列出如下:
+ Client可执行文件: /usr/local/taos/bin/taos 软连接到 /usr/local/bin/taos
+ 配置文件: /usr/local/taos/cfg/taos.cfg 软连接到 /etc/taos/taos.cfg
+ 驱动程序目录: /usr/local/taos/driver/libtaos.1.6.5.1.dylib 软连接到 /usr/local/lib/libtaos.dylib
+ 驱动程序头文件: /usr/local/taos/include/taos.h 软连接到 /usr/local/include/taos.h
+ 日志目录(第一次运行程序时生成):~/TDengineLog
## MQTT客户端
MQTT客户端实现了订阅MQTT Broker的特定Topic将Json数据进行转换入库的功能,任何终端只要将数据发给特定的Topic 即可,不用再编写转换器或者数据解析程序。如果终端量大,需要 Mqtt Broker 群集,这里不再详述。
#### 如何配置?
首先需要在 taos.cfg 中打开配置项 mqtt 用来启用, 再通过修改 mqttBrokerAddress 的值来配置连接,格式为:
> mqtt://username:password@hostname:port/path/
例如:
> mqtt://127.0.0.1:1883/taos/ mqtt://root@kissme@127.0.0.1:1883/taos/
#### Topic 格式说明
Mqtt 的topic格式为
> /<path>/<token>/<db name>/<table name>/
因此TDengine的Mqtt客户端会订阅:
> /taos/+/+/+/+/
例如:
> /taos/token/db/t/
注意: 测试时如果需要使用到Mqtt Broker 推荐使用 [mosquitto](http://mosquitto.org/) ,客户端可以使用 [MQTT.fx ](http://www.jensd.de/)
[1]: https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver
......
......@@ -18,7 +18,7 @@ import (
"sync"
"time"
_ "github.com/taosdata/TDengine/src/connector/go/src/taosSql"
_ "github.com/taosdata/TDengine/src/connector/go/taosSql"
)
const (
......@@ -634,6 +634,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
if appendRows == batch {
// executebatch
insertSql := buffers.String()
connection.Exec("use " + db)
affectedRows := executeBatchInsert(insertSql, connection)
successRows[threadIndex] += affectedRows
......@@ -658,6 +659,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []
if appendRows > 0 {
// executebatch
insertSql := buffers.String()
connection.Exec("use " + db)
affectedRows := executeBatchInsert(insertSql, connection)
successRows[threadIndex] += affectedRows
......
......@@ -155,7 +155,7 @@
# maxVnodeConnections 10000
# mnode take into account while balance, for cluster version only
# mgmtEqualVnodeNum 4
# mnodeEqualVnodeNum 4
# number of seconds allowed for a dnode to be offline, for cluster version only
# offlineThreshold 864000
......@@ -166,6 +166,15 @@
# start system monitor module
# monitor 1
# start http service
# mqtt 0
# mqtt uri
# mqttBrokerAddress mqtt://username:password@hostname:1883/taos/
# mqtt client name
# mqttBrokerClientId taos_mqtt
# maximum number of rows returned by the restful interface
# restfulRowLimit 10240
......@@ -244,5 +253,8 @@
# debug flag for system monitor
# monitorDebugFlag 131
#debug flag for mqtt client
# mqttDebugFlag 131
# debug flag for TAOS TIMER
# tmrDebugFlag 131
此差异已折叠。
此差异已折叠。
......@@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql);
void tscRemoveFromSqlList(SSqlObj *pSql);
void tscAddIntoStreamList(SSqlStream *pStream);
void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj);
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj);
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj);
void tscKillQuery(STscObj *pObj, uint32_t killId);
void tscKillStream(STscObj *pObj, uint32_t killId);
void tscKillConnection(STscObj *pObj);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -23,4 +23,5 @@ void extractTableName(const char *tableId, char *name);
char* extractDBName(const char *tableId, char *name);
#endif // TDENGINE_NAME_H
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册