提交 50be7f80 编写于 作者: I Ian Craggs

Merge branch 'develop'

Please see the [Eclipse Community Code of Conduct](https://eclipse.org/org/documents/Community_Code_of_Conduct.php)
# Community Code of Conduct
**Version 1.2
August 19, 2020**
## Our Pledge
In the interest of fostering an open and welcoming environment, we as community members, contributors, committers, and project leaders pledge to make participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a professional setting
## Our Responsibilities
With the support of the Eclipse Foundation staff (the “Staff”), project committers and leaders are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
Project committers and leaders have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
## Scope
This Code of Conduct applies within all project spaces, and it also applies when an individual is representing the Eclipse Foundation project or its community in public spaces. Examples of representing a project or community include posting via an official social media account, or acting as a project representative at an online or offline event. Representation of a project may be further defined and clarified by project committers, leaders, or the EMO.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the Staff at codeofconduct@eclipse.org. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The Staff is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.
Project committers or leaders who do not follow the Code of Conduct in good faith may face temporary or permanent repercussions as determined by the Staff.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant](https://www.contributor-covenant.org) , version 1.4, available at [https://www.contributor-covenant.org/version/1/4/code-of-conduct.html](https://www.contributor-covenant.org/version/1/4/code-of-conduct/)
......@@ -23,22 +23,22 @@ Synchronous and various asynchronous programming models are supported.
The Paho C client comprises four variant libraries, shared or static:
* paho-mqtt3a - asynchronous (MQTTAsync)
* paho-mqtt3as - asynchronous with SSL (MQTTAsync)
* paho-mqtt3as - asynchronous with SSL/TLS (MQTTAsync)
* paho-mqtt3c - "classic" / synchronous (MQTTClient)
* paho-mqtt3cs - "classic" / synchronous with SSL (MQTTClient)
* paho-mqtt3cs - "classic" / synchronous with SSL/TLS (MQTTClient)
[Which Paho C API to use, with some history, for context](https://modelbasedtesting.co.uk/2013/10/13/which-paho-mqtt-c-api-to-use-and-some-history/)
## Usage and API
Detailed API documentation [is available online](https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/index.html). It is also available by building the Doxygen docs in the ``doc`` directory.
Detailed API documentation [is available online](https://eclipse.github.io/paho.mqtt.c/MQTTClient/html/). It is also available by building the Doxygen docs in the ``doc`` directory.
Samples are available in the Doxygen docs and also in ``src/samples`` for reference. These are:
Samples are available in the Doxygen docs and also in `src/samples` for reference. These are:
- paho_c_pub.c and paho_c_sub.c: command line utilities to publish and subscribe, -h will give help
- paho_cs_pub.c paho_cs_sub.c: command line utilities using MQTTClient to publish and subscribe
- MQTTClient_publish.c, MQTTClient_subscribe.c and MQTTClient_publish_async.c: MQTTClient simple code examples
- MQTTAsync_publish.c and MQTTAsync_subscribe.c: MQTTAsync simple code examples
- *paho_c_pub.c* and *paho_c_sub.c:* command line utilities to publish and subscribe, -h will give help
- *paho_cs_pub.c* and *paho_cs_sub.c:* command line utilities using MQTTClient to publish and subscribe
- *MQTTClient_publish.c, MQTTClient_subscribe.c* and *MQTTClient_publish_async.c:* MQTTClient simple code examples
- *MQTTAsync_publish.c* and *MQTTAsync_subscribe.c:* MQTTAsync simple code examples
Some potentially useful blog posts:
......@@ -52,11 +52,11 @@ Some potentially useful blog posts:
A number of environment variables control runtime tracing of the C library.
Tracing is switched on using ``MQTT_C_CLIENT_TRACE`` (a value of ON traces to stdout, any other value should specify a file to trace to).
Tracing is switched on using `MQTT_C_CLIENT_TRACE` (a value of ON traces to stdout, any other value should specify a file to trace to).
The verbosity of the output is controlled using the ``MQTT_C_CLIENT_TRACE_LEVEL`` environment variable - valid values are ERROR, PROTOCOL, MINIMUM, MEDIUM and MAXIMUM (from least to most verbose).
The verbosity of the output is controlled using the `MQTT_C_CLIENT_TRACE_LEVEL` environment variable - valid values are ERROR, PROTOCOL, MINIMUM, MEDIUM and MAXIMUM (from least to most verbose).
The variable ``MQTT_C_CLIENT_TRACE_MAX_LINES`` limits the number of lines of trace that are output.
The variable `MQTT_C_CLIENT_TRACE_MAX_LINES` limits the number of lines of trace that are output.
```
export MQTT_C_CLIENT_TRACE=ON
......@@ -71,75 +71,89 @@ Please open issues in the Github project: https://github.com/eclipse/paho.mqtt.c
Discussion of the Paho clients takes place on the [Eclipse paho-dev mailing list](https://dev.eclipse.org/mailman/listinfo/paho-dev).
Follow Eclipse Paho on Twitter: [@eclipsepaho](https://twitter.com/eclipsepaho)
General questions about the MQTT protocol are discussed in the [MQTT Google Group](https://groups.google.com/forum/?hl=en-US&fromgroups#!forum/mqtt).
There is more information available via the [MQTT community site](http://mqtt.org).
## Build instructions for GNU Make
## Building with CMake
Ensure the OpenSSL development package is installed. Then from the client library base directory run:
The build process currently supports a number of Linux "flavors" including ARM and s390, OS X, AIX and Solaris as well as the Windows operating system. The build process requires the following tools:
* [CMake](http://cmake.org)
* [GNU Make](https://www.gnu.org/software/make/) or [Ninja](https://martine.github.io/ninja/)
* A conforming C compiler, such as [gcc](https://gcc.gnu.org/), [Clang](https://clang.llvm.org/), etc
On Debian based systems this would mean that the following packages have to be installed:
```
make
sudo make install
$ apt-get install build-essential gcc make cmake cmake-gui cmake-curses-gui
```
This will build and install the libraries. To uninstall:
Also, in order to build a debian package from the source code, the following packages have to be installed
```
sudo make uninstall
$ apt-get install fakeroot devscripts dh-make lsb-release
```
To build the documentation requires doxygen and optionally graphviz.
Ninja can be downloaded from its github project page in the "releases" section. Optionally it is possible to build binaries with SSL/TLS support. This requires the OpenSSL libraries and includes to be available. E. g. on Debian:
```
make html
$ apt-get install libssl-dev
```
The provided GNU Makefile is intended to perform all build steps in the ```build``` directory within the source-tree of Eclipse Paho. Generated binares, libraries, and the documentation can be found in the ```build/output``` directory after completion.
The documentation requires doxygen and optionally graphviz:
Options that are passed to the compiler/linker can be specified by typical Unix build variables:
```
$ apt-get install doxygen graphviz
```
Variable | Description
------------ | -------------
CC | Path to the C compiler
CFLAGS | Flags passed to compiler calls
LDFLAGS | Flags passed to linker calls
### Building your application with CMake
If the Paho C library was built with CMake and is already installed on the system, it is relatively easy to set up a CMake build for your application. (If it's not already built and installed read the next section).
## Build requirements / compilation using CMake
The library can be built with several options which create variations of the library for asynchronous or synchronous use; encryption (SSL/TLS) support or not; and whether the library is shared or static. CMake exports all of the libraries that were built as targets, and the user can chose which is best suited for an application.
The build process currently supports a number of Linux "flavors" including ARM and s390, OS X, AIX and Solaris as well as the Windows operating system. The build process requires the following tools:
* CMake (http://cmake.org)
* Ninja (https://martine.github.io/ninja/) or
GNU Make (https://www.gnu.org/software/make/), and
* gcc (https://gcc.gnu.org/).
The package is named: **eclipse-paho-mqtt-c**
On Debian based systems this would mean that the following packages have to be installed:
The namespace for all the targets is also: **eclipse-paho-mqtt-c**
```
apt-get install build-essential gcc make cmake cmake-gui cmake-curses-gui
```
The target names are the same as the library names. The static libraries append *-static* to the target name even for platforms that use the same base name for shared and static libraries. So:
Also, in order to build a debian package from the source code, the following packages have to be installed
Target|Description
------|-----------
paho-mqtt3a | asynchronous, no encryption
paho-mqtt3as | asynchronous with SSL/TLS support
paho-mqtt3c | synchronous, no encryption
paho-mqtt3cs | synchronous with SSL/TLS support
paho-mqtt3a-static | asynchronous, no encryption, static linkage
paho-mqtt3as-static | asynchronous with SSL/TLS support, static linkage
paho-mqtt3c-static | synchronous, no encryption, static linkage
paho-mqtt3cs-static | synchronous with SSL/TLS support, static linkage
```
apt-get install fakeroot fakeroot devscripts dh-make lsb-release
```
Remember, though, that not all of these targets may be available. It depends on how the library was built.
Ninja can be downloaded from its github project page in the "releases" section. Optionally it is possible to build binaries with SSL support. This requires the OpenSSL libraries and includes to be available. E. g. on Debian:
A sample *CMakeLists.txt* for an application that uses the asynchronous library with encryption support *(paho-mqtt3as)* might look like this:
```
apt-get install libssl-dev
cmake_minimum_required(VERSION 3.5)
project(MyMQTTApp VERSION 1.0.0 LANGUAGES C)
find_package(eclipse-paho-mqtt-c REQUIRED)
add_executable(MyMQTTApp MyMQTTApp.c)
target_link_libraries(MQTTVersion eclipse-paho-mqtt-c::paho-mqtt3as)
```
The documentation requires doxygen and optionally graphviz:
If the library was installed to a non-traditional location, you may need to tell CMake where to find it using `CMAKE_PREFIX_PATH`. For example, if you installed it in */opt/mqtt/paho.mqtt.c*
```
apt-get install doxygen graphviz
$ cmake -DCMAKE_PREFIX_PATH=/opt/mqtt/paho.mqtt.c ..
```
### Building the Paho C library with CMake
Before compiling, determine the value of some variables in order to configure features, library locations, and other options:
Variable | Default Value | Description
......@@ -162,37 +176,46 @@ Using these variables CMake can be used to generate your Ninja or Make files. Us
An example build session targeting the build platform could look like this:
```
mkdir /tmp/build.paho
cd /tmp/build.paho
cmake -GNinja -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=TRUE -DPAHO_BUILD_SAMPLES=TRUE ~/git/org.eclipse.paho.mqtt.c
$ mkdir /tmp/build.paho ; cd /tmp/build.paho
$ cmake -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=TRUE \
-DPAHO_BUILD_SAMPLES=TRUE ~/paho.mqtt.c
```
Invoking cmake and specifying build options can also be performed using cmake-gui or ccmake (see https://cmake.org/runningcmake/). For example:
```
ccmake -GNinja ~/git/org.eclipse.paho.mqtt.c
$ ccmake ~/paho.mqtt.c
```
To compile/link the binaries and to generate packages, simply invoke `ninja package` or `make -j <number-of-cores-to-use> package` after CMake. To simply compile/link invoke `ninja` or `make -j <number-of-cores-to-use>`.
To compile/link the binaries, to install, or to generate packages, use these commands:
```
$ cmake --build .
$ cmake --build . --target install
$ cmake --build . --target package
```
To build, install, or generate packages, you can also use the generated builder like _ninja_ or _make_ directly after invoking the initial CMake configuration step, such as `ninja package` or `make -j <number-of-jpbs> package`.
### Debug builds
Debug builds can be performed by defining the value of the ```CMAKE_BUILD_TYPE``` option to ```Debug```. For example:
Debug builds can be performed by defining the value of the `CMAKE_BUILD_TYPE` option to `Debug`. For example:
```
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug git/org.eclipse.paho.mqtt.c
$ cmake -DCMAKE_BUILD_TYPE=Debug ~/paho.mqtt.c
```
### Running the tests
Test code is available in the ``test`` directory. The tests can be built and executed with the CMake build system. The test execution requires a MQTT broker running. By default, the build system uses ```localhost```, however it is possible to configure the build to use an external broker. These parameters are documented in the Build Requirements section above.
Test code is available in the `test` directory. The tests can be built and executed with the CMake build system. The test execution requires a MQTT broker running. By default, the build system uses `localhost`, however it is possible to configure the build to use an external broker. These parameters are documented in the Build Requirements section above.
After ensuring a MQTT broker is available, it is possible to execute the tests by starting the proxy and running `ctest` as described below:
```
python ../test/mqttsas2.py &
ctest -VV
$ python ../test/mqttsas2.py &
$ ctest -VV
```
### Cross compilation
......@@ -212,24 +235,93 @@ The provided toolchain files assume that required compilers/linkers are to be fo
Example invocation for the Raspberry Pi:
```
cmake -GNinja -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_SAMPLES=TRUE -DPAHO_BUILD_DOCUMENTATION=TRUE -DOPENSSL_LIB_SEARCH_PATH=/tmp/libssl-dev/usr/lib/arm-linux-gnueabihf -DOPENSSL_INC_SEARCH_PATH="/tmp/libssl-dev/usr/include/openssl;/tmp/libssl-dev/usr/include/arm-linux-gnueabihf" -DCMAKE_TOOLCHAIN_FILE=~/git/org.eclipse.paho.mqtt.c/cmake/toolchain.linux-arm11.cmake ~/git/org.eclipse.paho.mqtt.c
$ cmake -GNinja -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_SAMPLES=TRUE \
-DPAHO_BUILD_DOCUMENTATION=TRUE \
-DOPENSSL_LIB_SEARCH_PATH=/tmp/libssl-dev/usr/lib/arm-linux-gnueabihf \
-DOPENSSL_INC_SEARCH_PATH="/tmp/libssl-dev/usr/include/openssl;/tmp/libssl-dev/usr/include/arm-linux-gnueabihf" \
-DCMAKE_TOOLCHAIN_FILE=~/paho.mqtt.c/cmake/toolchain.linux-arm11.cmake \
~/paho.mqtt.c
```
Compilers for the Raspberry Pi can be obtained from e. g. Linaro (see: http://releases.linaro.org/15.06/components/toolchain/binaries/4.8/arm-linux-gnueabihf/). This example assumes that OpenSSL-libraries and includes have been installed in the ```/tmp/libssl-dev``` directory.
Compilers for the Raspberry Pi and other ARM targets can be obtained from ARM (https://developer.arm.com/tools-and-software/open-source-software/developer-tools/gnu-toolchain/downloads)
This example assumes that OpenSSL-libraries and includes have been installed in the `/tmp/libssl-dev` directory.
Example invocation for Windows 64 bit:
```
cmake -GNinja -DPAHO_BUILD_SAMPLES=TRUE -DCMAKE_TOOLCHAIN_FILE=~/git/org.eclipse.paho.mqtt.c/cmake/toolchain.win64.cmake ~/git/org.eclipse.paho.mqtt.c
$ cmake -DPAHO_BUILD_SAMPLES=TRUE \
-DCMAKE_TOOLCHAIN_FILE=~/paho.mqtt.c/cmake/toolchain.win64.cmake \
~/paho.mqtt.c
```
In this case the libraries and executable are not linked against OpenSSL Libraries. Cross compilers for the Windows platform can be installed on Debian like systems like this:
```
apt-get install gcc-mingw-w64-x86-64 gcc-mingw-w64-i686
$ apt-get install gcc-mingw-w64-x86-64 gcc-mingw-w64-i686
```
## Build instructions for GNU Make
Ensure the OpenSSL development package is installed. Then from the client library base directory run:
```
$ make
$ sudo make install
```
This will build and install the libraries. To uninstall:
```
$ sudo make uninstall
```
To build the documentation requires doxygen and optionally graphviz.
```
$ make html
```
The provided GNU Makefile is intended to perform all build steps in the ```build``` directory within the source-tree of Eclipse Paho. Generated binares, libraries, and the documentation can be found in the ```build/output``` directory after completion.
Options that are passed to the compiler/linker can be specified by typical Unix build variables:
Variable | Description
------------ | -------------
CC | Path to the C compiler
CFLAGS | Flags passed to compiler calls
LDFLAGS | Flags passed to linker calls
## Building paho-mqtt - Using vcpkg
You can download and install paho-mqtt using the [vcpkg](https://github.com/Microsoft/vcpkg) dependency manager:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh
./vcpkg integrate install
./vcpkg install paho-mqtt
The paho-mqtt port in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please [create an issue or pull request](https://github.com/Microsoft/vcpkg) on the vcpkg repository.
## Fully static builds with musl libc
(By Frank Pagliughi)
[musl libc](https://musl.libc.org/) is is an implementation of the C standard library built on top of the Linux system call API, including interfaces defined in the base language standard, POSIX, and widely agreed-upon extensions.
Users of the Rust library, which wraps this one, had been complaining that they could not compile using the musl build tools. Musl is a small std C lib that can be statically linked. With the latest Paho C library (and a very minor tweak to the build), we're now able to build Rust apps using musl and Paho C that are fully static; no runtime dependencies on the platform; not even on the standard C lib.
$ ./async_publish
Publishing a message on the 'test' topic
$ ldd async_publish
not a dynamic executable
So, for example, if maintaining a suite of apps for some newer and older embedded Linux boards, the same executables could be deployed without worry about the C ABI on the particular boards.
Certainly C apps using the Paho library could do this also.
## Microsoft Windows
### Calling convention
......
# Security Policy
This project follows the [Eclipse Vulnerability Reporting Policy](https://www.eclipse.org/security/policy.php).
Vulnerabilities are tracked by the Eclipse security team, in cooperation with the project lead.
Fixing vulnerabilities is taken care of by the project committers, with assistance and guidance of the security
team.
## Supported Versions
Eclipse Paho provides security updates for the two most recent version only.
## Reporting a Vulnerability
We recommend that in case of suspected vulnerabilities you do not create a GitHub issue, but instead contact the
Eclipse Security Team directly sending an email to security@eclipse.org.
......@@ -23,6 +23,7 @@ environment:
configuration: Debug
install:
- cmd: ver
- cmd: openssl version
- cmd: C:\Python36\python --version
- cmd: netsh advfirewall firewall add rule name="Python 3.6" dir=in action=allow program="C:\Python36\python.exe" enable=yes
......
......@@ -328,9 +328,11 @@ int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const
if (strstr(serverURI, "://") != NULL)
{
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
&& strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
&& strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
#if defined(OPENSSL)
&& strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
&& strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
&& strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
&& strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
#endif
)
......@@ -361,6 +363,7 @@ int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const
Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
bstate->clients = ListInitialize();
Socket_outInitialize();
Socket_setWriteContinueCallback(MQTTAsync_writeContinue);
Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
MQTTAsync_handles = ListInitialize();
......@@ -379,6 +382,8 @@ int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const
memset(m, '\0', sizeof(MQTTAsyncs));
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
serverURI += strlen(URI_MQTT);
else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
{
serverURI += strlen(URI_WS);
......@@ -390,6 +395,11 @@ int MQTTAsync_createWithOptions(MQTTAsync* handle, const char* serverURI, const
serverURI += strlen(URI_SSL);
m->ssl = 1;
}
else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
{
serverURI += strlen(URI_MQTTS);
m->ssl = 1;
}
else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
{
serverURI += strlen(URI_WSS);
......@@ -486,7 +496,9 @@ void MQTTAsync_destroy(MQTTAsync* handle)
MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
MQTTAsync_NULLPublishResponses(m);
MQTTAsync_freeResponses(m);
MQTTAsync_NULLPublishCommands(m);
MQTTAsync_freeCommands(m);
ListFree(m->responses);
......@@ -791,7 +803,8 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
}
if (m->c->sslopts->struct_version >= 5)
{
m->c->sslopts->protos = options->ssl->protos;
if (options->ssl->protos)
m->c->sslopts->protos = (const unsigned char*)MQTTStrdup((const char*)options->ssl->protos);
m->c->sslopts->protos_len = options->ssl->protos_len;
}
}
......@@ -804,11 +817,17 @@ int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options)
#endif
if (m->c->username)
{
free((void*)m->c->username);
m->c->username = NULL;
}
if (options->username)
m->c->username = MQTTStrdup(options->username);
if (m->c->password)
{
free((void*)m->c->password);
m->c->password = NULL;
}
if (options->password)
{
m->c->password = MQTTStrdup(options->password);
......
......@@ -27,7 +27,7 @@
/**
* @cond MQTTAsync_main
* @mainpage Asynchronous MQTT client library for C
* @mainpage Asynchronous MQTT client library for C (MQTTAsync)
*
* &copy; Copyright 2009, 2022 IBM Corp., Ian Craggs and others
*
......@@ -170,9 +170,14 @@
*/
#define MQTTASYNC_SSL_NOT_SUPPORTED -13
/**
* Return code: protocol prefix in serverURI should be tcp://, ssl://, ws:// or wss://
* The TLS enabled prefixes (ssl, wss) are only valid if the TLS version of the library
* is linked with.
* Return code: protocol prefix in serverURI should be:
* @li @em tcp:// or @em mqtt:// - Insecure TCP
* @li @em ssl:// or @em mqtts:// - Encrypted SSL/TLS
* @li @em ws:// - Insecure websockets
* @li @em wss:// - Secure web sockets
*
* The TLS enabled prefixes (ssl, mqtts, wss) are only valid if the TLS
* version of the library is linked with.
*/
#define MQTTASYNC_BAD_PROTOCOL -14
/**
......@@ -728,7 +733,8 @@ typedef struct MQTTAsync_responseOptions
/**
* A token is returned from the call. It can be used to track
* the state of this request, both in the callbacks and in future calls
* such as ::MQTTAsync_waitForCompletion.
* such as ::MQTTAsync_waitForCompletion. This is output only - any
* change by the application will be ignored.
*/
MQTTAsync_token token;
/**
......@@ -903,14 +909,22 @@ LIBMQTT_API int MQTTAsync_reconnect(MQTTAsync handle);
* populated with a valid client reference following a successful return from
* this function.
* @param serverURI A null-terminated string specifying the server to
* which the client will connect. It takes the form <i>protocol://host:port</i>.
* <i>protocol</i> must be <i>tcp</i>, <i>ssl</i>, <i>ws</i> or <i>wss</i>.
* The TLS enabled prefixes (ssl, wss) are only valid if a TLS version of
* the library is linked with.
* For <i>host</i>, you can
* specify either an IP address or a host name. For instance, to connect to
* a server running on the local machines with the default MQTT port, specify
* <i>tcp://localhost:1883</i>.
* which the client will connect. It takes the form
* <i>protocol://host:port</i> where <i>protocol</i> must be:
* <br>
* @em tcp:// or @em mqtt:// - Insecure TCP
* <br>
* @em ssl:// or @em mqtts:// - Encrypted SSL/TLS
* <br>
* @em ws:// - Insecure websockets
* <br>
* @em wss:// - Secure web sockets
* <br>
* The TLS enabled prefixes (ssl, mqtts, wss) are only valid if a TLS
* version of the library is linked with.
* For <i>host</i>, you can specify either an IP address or a host name. For
* instance, to connect to a server running on the local machines with the
* default MQTT port, specify <i>tcp://localhost:1883</i>.
* @param clientId The client identifier passed to the server when the
* client connects to it. It is a null-terminated UTF-8 encoded string.
* @param persistence_type The type of persistence to be used by the client:
......@@ -1173,8 +1187,13 @@ typedef struct
/**
* MQTTAsync_connectOptions defines several settings that control the way the
* client connects to an MQTT server. Default values are set in
* MQTTAsync_connectOptions_initializer.
* client connects to an MQTT server.
*
* Suitable default values are set in the following initializers:
* - MQTTAsync_connectOptions_initializer: for MQTT 3.1.1 non-WebSockets
* - MQTTAsync_connectOptions_initializer5: for MQTT 5.0 non-WebSockets
* - MQTTAsync_connectOptions_initializer_ws: for MQTT 3.1.1 WebSockets
* - MQTTAsync_connectOptions_initializer5_ws: for MQTT 5.0 WebSockets
*/
typedef struct
{
......@@ -1305,15 +1324,15 @@ typedef struct
*/
int MQTTVersion;
/**
* Reconnect automatically in the case of a connection being lost?
* Reconnect automatically in the case of a connection being lost. 0=false, 1=true
*/
int automaticReconnect;
/**
* Minimum retry interval in seconds. Doubled on each failed retry.
* The minimum automatic reconnect retry interval in seconds. Doubled on each failed retry.
*/
int minRetryInterval;
/**
* Maximum retry interval in seconds. The doubling stops here on failed retries.
* The maximum automatic reconnect retry interval in seconds. The doubling stops here on failed retries.
*/
int maxRetryInterval;
/**
......@@ -1361,16 +1380,23 @@ typedef struct
const char* httpsProxy;
} MQTTAsync_connectOptions;
/** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
/** Initializer for connect options for MQTT 5.0 non-WebSocket connections */
#define MQTTAsync_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
/** Initializer for connect options for MQTT 3.1.1 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTAsync_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
/** Initializer for connect options for MQTT 5.0 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTAsync_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_5, 0, 1, 60, {0, NULL}, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL}
......
......@@ -1053,6 +1053,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
free(command->key);
}
static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
{
MQTTAsync_freeCommand1(command);
......@@ -1060,6 +1061,19 @@ static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
}
void MQTTAsync_writeContinue(SOCKET socket)
{
ListElement* found = NULL;
if ((found = ListFindItem(MQTTAsync_handles, &socket, clientSockCompare)) != NULL)
{
MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
m->c->net.lastSent = MQTTTime_now();
}
}
void MQTTAsync_writeComplete(SOCKET socket, int rc)
{
ListElement* found = NULL;
......@@ -1277,6 +1291,8 @@ static int MQTTAsync_processCommand(void)
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
serverURI += strlen(URI_MQTT);
else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
{
serverURI += strlen(URI_WS);
......@@ -1288,6 +1304,11 @@ static int MQTTAsync_processCommand(void)
serverURI += strlen(URI_SSL);
command->client->ssl = 1;
}
else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
{
serverURI += strlen(URI_MQTTS);
command->client->ssl = 1;
}
else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
{
serverURI += strlen(URI_WSS);
......@@ -1448,11 +1469,6 @@ static int MQTTAsync_processCommand(void)
command->client->pending_write = &command->command;
}
}
else
{
command->command.details.pub.payload = NULL; /* this will be freed by the protocol code */
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
}
free(p); /* should this be done if the write isn't complete? */
}
else if (command->command.type == DISCONNECT)
......@@ -1582,9 +1598,12 @@ exit:
static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
{
int was_connected = m->c->connected;
int more_to_try = 0;
int connectionLost_called = 0;
FUNC_ENTRY;
if (MQTTAsync_checkConn(&m->connect, m))
more_to_try = MQTTAsync_checkConn(&m->connect, m);
if (more_to_try)
{
MQTTAsync_queuedCommand* conn;
......@@ -1593,6 +1612,7 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
{
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
(*(m->cl))(m->clContext, NULL);
connectionLost_called = 1;
}
/* put the connect command back to the head of the command queue, using the next serverURI */
if ((conn = malloc(sizeof(MQTTAsync_queuedCommand))) == NULL)
......@@ -1613,12 +1633,14 @@ static void nextOrClose(MQTTAsyncs* m, int rc, char* message)
else
conn->command.details.conn.currentURI++;
MQTTAsync_addCommand(conn, sizeof(m->connect));
if (MQTTAsync_addCommand(conn, sizeof(m->connect)) != MQTTASYNC_SUCCESS)
more_to_try = 0; /* go into retry mode if CONNECT command add fails */
}
else
if (!more_to_try)
{
MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
if (m->cl && was_connected)
if (connectionLost_called == 0 && m->cl && was_connected)
{
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
(*(m->cl))(m->clContext, NULL);
......@@ -1759,6 +1781,7 @@ thread_return_type WINAPI MQTTAsync_sendThread(void* n)
int timeout = 10; /* first time in we have a small timeout. Gets things started more quickly */
FUNC_ENTRY;
Thread_set_name("MQTTAsync_send");
MQTTAsync_lock_mutex(mqttasync_mutex);
sendThread_state = RUNNING;
sendThread_id = Thread_getid();
......@@ -1978,6 +2001,7 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
FUNC_ENTRY;
Thread_set_name("MQTTAsync_rcv");
MQTTAsync_lock_mutex(mqttasync_mutex);
receiveThread_state = RUNNING;
receiveThread_id = Thread_getid();
......@@ -1993,10 +2017,11 @@ thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
MQTTAsync_lock_mutex(mqttasync_mutex);
if (MQTTAsync_tostop)
break;
timeout = 1000L;
if (sock == 0)
continue;
timeout = 1000L;
/* find client corresponding to socket */
if (ListFindItem(MQTTAsync_handles, &sock, clientSockCompare) == NULL)
{
......@@ -2402,6 +2427,65 @@ static int clientStructCompare(void* a, void* b)
}
/*
* Set destinationName and payload to NULL in all responses
* for a client, so that these memory locations aren't freed twice as they
* are also stored by MQTTProtocol_storePublication.
* @param m the client to process
*/
void MQTTAsync_NULLPublishResponses(MQTTAsyncs* m)
{
FUNC_ENTRY;
if (m->responses)
{
ListElement* cur_response = NULL;
while (ListNextElement(m->responses, &cur_response))
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(cur_response->content);
if (command->command.type == PUBLISH)
{
/* these values are going to be freed in RemovePublication */
command->command.details.pub.destinationName = NULL;
command->command.details.pub.payload = NULL;
}
}
}
FUNC_EXIT;
}
/*
* Set destinationName and payload to NULL in all commands
* for a client, so that these memory locations aren't freed twice as they
* are also stored by MQTTProtocol_storePublication.
* @param m the client to process
*/
void MQTTAsync_NULLPublishCommands(MQTTAsyncs* m)
{
ListElement* current = NULL;
ListElement *next = NULL;
FUNC_ENTRY;
current = ListNextElement(MQTTAsync_commands, &next);
ListNextElement(MQTTAsync_commands, &next);
while (current)
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->client == m && command->command.type == PUBLISH)
{
/* these values are going to be freed in RemovePublication */
command->command.details.pub.destinationName = NULL;
command->command.details.pub.payload = NULL;
}
current = next;
ListNextElement(MQTTAsync_commands, &next);
}
FUNC_EXIT;
}
/**
* Clean the MQTT session data. This includes the MQTT inflight messages, because
* that is part of the MQTT state that will be cleared by the MQTT broker too.
......@@ -2423,6 +2507,7 @@ static int MQTTAsync_cleanSession(Clients* client)
if ((found = ListFindItem(MQTTAsync_handles, client, clientStructCompare)) != NULL)
{
MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
MQTTAsync_NULLPublishResponses(m);
MQTTAsync_freeResponses(m);
}
else
......@@ -2603,7 +2688,7 @@ static int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
void MQTTProtocol_closeSession(Clients* c, int sendwill)
{
MQTTAsync_disconnect_internal((MQTTAsync)c->context, 0);
MQTTAsync_closeSession(c, MQTTREASONCODE_SUCCESS, NULL);
}
......@@ -2711,6 +2796,8 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
/* skip URI scheme */
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
serverURI += strlen(URI_MQTT);
else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
{
serverURI += strlen(URI_WS);
......@@ -2724,6 +2811,11 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
serverURI += strlen(URI_SSL);
default_port = SECURE_MQTT_DEFAULT_PORT;
}
else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
{
serverURI += strlen(URI_MQTTS);
default_port = SECURE_MQTT_DEFAULT_PORT;
}
else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
{
serverURI += strlen(URI_WSS);
......@@ -2944,6 +3036,7 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
ackrc = 0,
mqttversion = 0;
MQTTProperties msgprops = MQTTProperties_initializer;
Publications* pubToRemove = NULL;
/* This block is so that the ack variable is local and isn't accidentally reused */
{
......@@ -2961,11 +3054,11 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
}
if (pack->header.bits.type == PUBCOMP)
*rc = MQTTProtocol_handlePubcomps(pack, *sock);
*rc = MQTTProtocol_handlePubcomps(pack, *sock, &pubToRemove);
else if (pack->header.bits.type == PUBREC)
*rc = MQTTProtocol_handlePubrecs(pack, *sock);
*rc = MQTTProtocol_handlePubrecs(pack, *sock, &pubToRemove);
else if (pack->header.bits.type == PUBACK)
*rc = MQTTProtocol_handlePubacks(pack, *sock);
*rc = MQTTProtocol_handlePubacks(pack, *sock, &pubToRemove);
if (!m)
Log(LOG_ERROR, -1, "PUBCOMP, PUBACK or PUBREC received for no client, msgid %d", msgid);
if (m && (msgtype != PUBREC || ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR))
......@@ -3023,6 +3116,16 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->command.onFailure5))(command->command.context, &data);
}
if (pubToRemove != NULL)
{
MQTTProtocol_removePublication(pubToRemove);
pubToRemove = NULL;
/* removePublication has freed the topic and payload memory, so here we indicate that
* so freeCommand doesn't try to free them again.
*/
command->command.details.pub.destinationName = NULL;
command->command.details.pub.payload = NULL;
}
MQTTAsync_freeCommand(command);
break;
}
......@@ -3030,6 +3133,8 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
if (mqttversion >= MQTTVERSION_5)
MQTTProperties_free(&msgprops);
}
if (pubToRemove != NULL)
MQTTProtocol_removePublication(pubToRemove);
}
else if (pack->header.bits.type == PUBREL)
*rc = MQTTProtocol_handlePubrels(pack, *sock);
......
......@@ -20,9 +20,10 @@
#include "MQTTPacket.h"
#include "Thread.h"
#define URI_TCP "tcp://"
#define URI_WS "ws://"
#define URI_WSS "wss://"
#define URI_TCP "tcp://"
#define URI_MQTT "mqtt://"
#define URI_WS "ws://"
#define URI_WSS "wss://"
enum MQTTAsync_threadStates
{
......@@ -169,8 +170,11 @@ void MQTTAsync_closeSession(Clients* client, enum MQTTReasonCodes reasonCode, MQ
int MQTTAsync_disconnect1(MQTTAsync handle, const MQTTAsync_disconnectOptions* options, int internal);
int MQTTAsync_assignMsgId(MQTTAsyncs* m);
int MQTTAsync_getNoBufferedMessages(MQTTAsyncs* m);
void MQTTAsync_writeContinue(SOCKET socket);
void MQTTAsync_writeComplete(SOCKET socket, int rc);
void setRetryLoopInterval(int keepalive);
void MQTTAsync_NULLPublishResponses(MQTTAsyncs* m);
void MQTTAsync_NULLPublishCommands(MQTTAsyncs* m);
#if defined(_WIN32) || defined(_WIN64)
#else
......
......@@ -70,14 +70,16 @@
#if defined(OPENSSL)
#include <openssl/ssl.h>
#else
#define URI_SSL "ssl://"
#define URI_SSL "ssl://"
#define URI_MQTTS "mqtts://"
#endif
#include "OsWrapper.h"
#define URI_TCP "tcp://"
#define URI_WS "ws://"
#define URI_WSS "wss://"
#define URI_TCP "tcp://"
#define URI_MQTT "mqtt://"
#define URI_WS "ws://"
#define URI_WSS "wss://"
#include "VersionInfo.h"
#include "WebSocket.h"
......@@ -380,7 +382,7 @@ int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, cons
#endif
FUNC_ENTRY;
if ((rc = Thread_lock_mutex(mqttclient_mutex)) != 0)
goto exit;
goto nounlock_exit;
if (serverURI == NULL || clientId == NULL)
{
......@@ -403,9 +405,11 @@ int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, cons
if (strstr(serverURI, "://") != NULL)
{
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
&& strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
&& strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
#if defined(OPENSSL)
&& strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
&& strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
&& strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
&& strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
#endif
)
......@@ -448,6 +452,8 @@ int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, cons
m->commandTimeout = 10000L;
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
serverURI += strlen(URI_MQTT);
else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
{
serverURI += strlen(URI_WS);
......@@ -461,6 +467,16 @@ int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, cons
#else
rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
goto exit;
#endif
}
else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
{
#if defined(OPENSSL)
serverURI += strlen(URI_MQTTS);
m->ssl = 1;
#else
rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
goto exit;
#endif
}
else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
......@@ -509,6 +525,7 @@ int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, cons
exit:
Thread_unlock_mutex(mqttclient_mutex);
nounlock_exit:
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -800,10 +817,11 @@ static thread_return_type WINAPI MQTTClient_run(void* n)
long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
FUNC_ENTRY;
running = 1;
run_id = Thread_getid();
Thread_set_name("MQTTClient_run");
Thread_lock_mutex(mqttclient_mutex);
run_id = Thread_getid();
running = 1;
while (!tostop)
{
int rc = SOCKET_ERROR;
......@@ -1183,13 +1201,26 @@ static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_c
resp.reasonCode = SOCKET_ERROR;
if (m->ma && !running)
{
int count = 0;
Thread_start(MQTTClient_run, handle);
if (MQTTTime_elapsed(start) >= millisecsTimeout)
{
rc = SOCKET_ERROR;
goto exit;
}
MQTTTime_sleep(100L);
while (!running && ++count < 5)
{
Thread_unlock_mutex(mqttclient_mutex);
MQTTTime_sleep(100L);
Thread_lock_mutex(mqttclient_mutex);
}
if (!running)
{
rc = SOCKET_ERROR;
goto exit;
}
}
Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
......@@ -1612,11 +1643,17 @@ static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectO
#endif
if (m->c->username)
{
free((void*)m->c->username);
m->c->username = NULL;
}
if (options->username)
m->c->username = MQTTStrdup(options->username);
if (m->c->password)
{
free((void*)m->c->password);
m->c->password = NULL;
}
if (options->password)
{
m->c->password = MQTTStrdup(options->password);
......@@ -1665,7 +1702,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
MQTTClients* m = handle;
MQTTResponse response;
if (m->c->MQTTVersion >= MQTTVERSION_5)
if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
return MQTTCLIENT_WRONG_MQTT_VERSION;
response = MQTTClient_connectAll(handle, options, NULL, NULL);
......@@ -1680,7 +1717,7 @@ MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* o
MQTTClients* m = handle;
MQTTResponse response = MQTTResponse_initializer;
if (m->c->MQTTVersion < MQTTVERSION_5)
if (m != NULL && m->c != NULL && m->c->MQTTVersion < MQTTVERSION_5)
{
response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
return response;
......@@ -1707,7 +1744,7 @@ MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions*
goto exit;
}
if (options == NULL)
if (options == NULL || m == NULL || m->c == NULL)
{
rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
goto exit;
......@@ -1809,6 +1846,8 @@ MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions*
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
serverURI += strlen(URI_TCP);
else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
{
serverURI += strlen(URI_WS);
......@@ -1820,6 +1859,11 @@ MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions*
serverURI += strlen(URI_SSL);
m->ssl = 1;
}
else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
{
serverURI += strlen(URI_MQTTS);
m->ssl = 1;
}
else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
{
serverURI += strlen(URI_WSS);
......@@ -1920,7 +1964,7 @@ static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
*/
void MQTTProtocol_closeSession(Clients* c, int sendwill)
{
MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
MQTTClient_closeSession(c, MQTTREASONCODE_SUCCESS, NULL);
}
......@@ -2059,12 +2103,19 @@ MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const
}
else
{
ListElement* current = NULL;
i = 0;
while (ListNextElement(sub->qoss, &current))
ListElement *current = NULL;
/* if the returned count is greater than requested, it's an error*/
if (sub->qoss->count > count)
rc = MQTTCLIENT_FAILURE;
else
{
int* reqqos = (int*)(current->content);
qos[i++] = *reqqos;
i = 0;
while (ListNextElement(sub->qoss, &current))
{
int *reqqos = (int*) (current->content);
qos[i++] = *reqqos;
}
}
resp.reasonCode = rc;
}
......@@ -2096,7 +2147,7 @@ int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, i
MQTTClients* m = handle;
MQTTResponse response = MQTTResponse_initializer;
if (m->c->MQTTVersion >= MQTTVERSION_5)
if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
else
response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
......@@ -2240,7 +2291,13 @@ exit:
int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
{
MQTTResponse response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
MQTTClients* m = handle;
MQTTResponse response = MQTTResponse_initializer;
if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
else
response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
return response.reasonCode;
}
......@@ -2457,7 +2514,7 @@ int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClie
if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
(message->struct_version != 0 && message->struct_version != 1))
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
else if (m->c->MQTTVersion >= MQTTVERSION_5)
else if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
else
rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
......@@ -2544,7 +2601,7 @@ static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int
(*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
}
*rc = (pack->header.bits.type == PUBCOMP) ?
MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
MQTTProtocol_handlePubcomps(pack, *sock, NULL) : MQTTProtocol_handlePubacks(pack, *sock, NULL);
if (m && m->dc)
{
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
......@@ -2561,7 +2618,7 @@ static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int
(*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
&pubrec->properties, pubrec->rc);
}
*rc = MQTTProtocol_handlePubrecs(pack, *sock);
*rc = MQTTProtocol_handlePubrecs(pack, *sock, NULL);
}
else if (pack->header.bits.type == PUBREL)
*rc = MQTTProtocol_handlePubrels(pack, *sock);
......@@ -2585,6 +2642,7 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
MQTTPacket* pack = NULL;
MQTTClients* m = handle;
START_TIME_TYPE start = MQTTTime_start_clock();
int is_running = 0; /* local copy of running */
FUNC_ENTRY;
if (((MQTTClients*)handle) == NULL || timeout <= 0L)
......@@ -2593,7 +2651,11 @@ static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* r
goto exit;
}
if (running)
Thread_lock_mutex(mqttclient_mutex);
is_running = running;
Thread_unlock_mutex(mqttclient_mutex);
if (is_running)
{
if (packet_type == CONNECT)
{
......
......@@ -40,7 +40,7 @@
*
* @endcond
* @cond MQTTClient_main
* @mainpage MQTT Client library for C
* @mainpage MQTT Client library for C (MQTTClient)
* &copy; Copyright 2009, 2022 IBM Corp., Ian Craggs and others
*
* @brief An MQTT client library in C.
......@@ -179,9 +179,13 @@
*/
#define MQTTCLIENT_BAD_MQTT_VERSION -11
/**
* Return code: protocol prefix in serverURI should be tcp://, ssl://, ws:// or wss://
* The TLS enabled prefixes (ssl, wss) are only valid if a TLS version of the library
* is linked with.
* Return code: protocol prefix in serverURI should be:
* @li @em tcp:// or @em mqtt:// - Insecure TCP
* @li @em ssl:// or @em mqtts:// - Encrypted SSL/TLS
* @li @em ws:// - Insecure websockets
* @li @em wss:// - Secure web sockets
* The TLS enabled prefixes (ssl, mqtts, wss) are only valid if a TLS
* version of the library is linked with.
*/
#define MQTTCLIENT_BAD_PROTOCOL -14
/**
......@@ -494,13 +498,21 @@ LIBMQTT_API int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTCl
* this function.
* @param serverURI A null-terminated string specifying the server to
* which the client will connect. It takes the form <i>protocol://host:port</i>.
* Currently, <i>protocol</i> must be <i>tcp</i>, <i>ssl</i>, <i>ws</i> or <i>wss</i>.
* The TLS enabled prefixes (ssl, wss) are only valid if a TLS version of the library
* is linked with.
* For <i>host</i>, you can
* specify either an IP address or a host name. For instance, to connect to
* a server running on the local machines with the default MQTT port, specify
* <i>tcp://localhost:1883</i>.
* Currently, <i>protocol</i> must be:
* <br>
* @em tcp:// or @em mqtt:// - Insecure TCP
* <br>
* @em ssl:// or @em mqtts:// - Encrypted SSL/TLS
* <br>
* @em ws:// - Insecure websockets
* <br>
* @em wss:// - Secure web sockets
* <br>
* The TLS enabled prefixes (ssl, mqtts, wss) are only valid if a TLS
* version of the library is linked with.
* For <i>host</i>, you can specify either an IP address or a host name. For
* instance, to connect to a server running on the local machines with the
* default MQTT port, specify <i>tcp://localhost:1883</i>.
* @param clientId The client identifier passed to the server when the
* client connects to it. It is a null-terminated UTF-8 encoded string.
* @param persistence_type The type of persistence to be used by the client:
......@@ -802,6 +814,12 @@ LIBMQTT_API MQTTClient_nameValue* MQTTClient_getVersionInfo(void);
* values to 0 (NULL for pointers). A #keepAliveInterval setting of 0 prevents
* correct operation of the client and so you <b>must</b> at least set a value
* for #keepAliveInterval.
*
* Suitable default values are set in the following initializers:
* - MQTTClient_connectOptions_initializer: for MQTT 3.1.1 non-WebSockets
* - MQTTClient_connectOptions_initializer5: for MQTT 5.0 non-WebSockets
* - MQTTClient_connectOptions_initializer_ws: for MQTT 3.1.1 WebSockets
* - MQTTClient_connectOptions_initializer5_ws: for MQTT 5.0 WebSockets
*/
typedef struct
{
......@@ -960,15 +978,23 @@ typedef struct
const char* httpsProxy;
} MQTTClient_connectOptions;
/** Initializer for connect options for MQTT 3.1.1 non-WebSocket connections */
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL}
/** Initializer for connect options for MQTT 5.0 non-WebSocket connections */
#define MQTTClient_connectOptions_initializer5 { {'M', 'Q', 'T', 'C'}, 8, 60, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL}
/** Initializer for connect options for MQTT 3.1.1 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTClient_connectOptions_initializer_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL}
/** Initializer for connect options for MQTT 5.0 WebSockets connections.
* The keepalive interval is set to 45 seconds to avoid webserver 60 second inactivity timeouts.
*/
#define MQTTClient_connectOptions_initializer5_ws { {'M', 'Q', 'T', 'C'}, 8, 45, 0, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_5, {NULL, 0, 0}, {0, NULL}, -1, 1, NULL, NULL, NULL}
......
/*******************************************************************************
* Copyright (c) 2009, 2021 IBM Corp. and Ian Craggs
* Copyright (c) 2009, 2022 IBM Corp. and Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
......@@ -132,7 +132,10 @@ void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
}
if (actual_len < remaining_length)
{
*error = TCPSOCKET_INTERRUPTED;
net->lastReceived = MQTTTime_now();
}
else
{
ptype = header.bits.type;
......
......@@ -297,10 +297,16 @@ void MQTTProtocol_removePublication(Publications* p)
FUNC_ENTRY;
if (p && --(p->refcount) == 0)
{
free(p->payload);
p->payload = NULL;
free(p->topic);
p->topic = NULL;
if (p->payload)
{
free(p->payload);
p->payload = NULL;
}
if (p->topic)
{
free(p->topic);
p->topic = NULL;
}
ListRemove(&(state.publications), p);
}
FUNC_EXIT;
......@@ -427,7 +433,7 @@ exit:
* @param sock the socket on which the packet was received
* @return completion code
*/
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock)
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock, Publications** pubToRemove)
{
Puback* puback = (Puback*)pack;
Clients* client = NULL;
......@@ -453,7 +459,10 @@ int MQTTProtocol_handlePubacks(void* pack, SOCKET sock)
(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
m->qos, puback->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (pubToRemove != NULL)
*pubToRemove = m->publish;
else
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m);
......@@ -473,7 +482,7 @@ int MQTTProtocol_handlePubacks(void* pack, SOCKET sock)
* @param sock the socket on which the packet was received
* @return completion code
*/
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock)
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock, Publications** pubToRemove)
{
Pubrec* pubrec = (Pubrec*)pack;
Clients* client = NULL;
......@@ -515,7 +524,10 @@ int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock)
(pubrec->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
m->qos, pubrec->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (pubToRemove != NULL)
*pubToRemove = m->publish;
else
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m);
......@@ -627,7 +639,7 @@ int MQTTProtocol_handlePubrels(void* pack, SOCKET sock)
* @param sock the socket on which the packet was received
* @return completion code
*/
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock)
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock, Publications** pubToRemove)
{
Pubcomp* pubcomp = (Pubcomp*)pack;
Clients* client = NULL;
......@@ -662,7 +674,10 @@ int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock)
if (rc != 0)
Log(LOG_ERROR, -1, "Error removing PUBCOMP for client id %s msgid %d from persistence", client->clientID, pubcomp->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (pubToRemove != NULL)
*pubToRemove = m->publish;
else
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m);
......@@ -698,22 +713,23 @@ void MQTTProtocol_keepalive(START_TIME_TYPE now)
if (client->ping_outstanding == 1)
{
if (MQTTTime_difftime(now, client->net.lastPing) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000))
if (MQTTTime_difftime(now, client->net.lastPing) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1500) &&
MQTTTime_difftime(now, client->net.lastReceived) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1500))
{
Log(TRACE_PROTOCOL, -1, "PINGRESP not received in keepalive interval for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1);
}
}
else if (client->ping_due == 1 &&
(MQTTTime_difftime(now, client->ping_due_time) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000)))
(MQTTTime_difftime(now, client->ping_due_time) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1500)))
{
/* ping still outstanding after keep alive interval, so close session */
Log(TRACE_PROTOCOL, -1, "PINGREQ still outstanding for client %s on socket %d, disconnecting", client->clientID, client->net.socket);
MQTTProtocol_closeSession(client, 1);
}
else if (MQTTTime_difftime(now, client->net.lastSent) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000) ||
MQTTTime_difftime(now, client->net.lastReceived) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000))
else if (MQTTTime_difftime(now, client->net.lastSent) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000) &&
MQTTTime_difftime(now, client->net.lastReceived) >= (DIFF_TIME_TYPE)(client->keepAliveInterval * 1000))
{
if (Socket_noPendingWrites(client->net.socket))
{
......@@ -936,8 +952,13 @@ void MQTTProtocol_freeClient(Clients* client)
if (client->sslopts->CApath)
free((void*)client->sslopts->CApath);
}
if (client->sslopts->struct_version >= 5)
{
if (client->sslopts->protos)
free((void*)client->sslopts->protos);
}
free(client->sslopts);
client->sslopts = NULL;
client->sslopts = NULL;
}
#endif
/* don't free the client structure itself... this is done elsewhere */
......
......@@ -40,10 +40,10 @@ void MQTTProtocol_removePublication(Publications* p);
void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload);
int MQTTProtocol_handlePublishes(void* pack, SOCKET sock);
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock);
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock);
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock, Publications** pubToRemove);
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock, Publications** pubToRemove);
int MQTTProtocol_handlePubrels(void* pack, SOCKET sock);
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock);
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock, Publications** pubToRemove);
void MQTTProtocol_closeSession(Clients* c, int sendwill);
void MQTTProtocol_keepalive(START_TIME_TYPE);
......
......@@ -30,7 +30,8 @@
#include "SocketBuffer.h"
#include "Clients.h"
#define URI_SSL "ssl://"
#define URI_SSL "ssl://"
#define URI_MQTTS "mqtts://"
/** if we should handle openssl initialization (bool_value == 1) or depend on it to be initalized externally (bool_value == 0) */
void SSLSocket_handleOpensslInit(int bool_value);
......
......@@ -46,7 +46,7 @@
#if defined(USE_SELECT)
int isReady(int socket, fd_set* read_set, fd_set* write_set);
int Socket_continueWrites(fd_set* pwset, int* socket, mutex_type mutex);
int Socket_continueWrites(fd_set* pwset, SOCKET* socket, mutex_type mutex);
#else
int isReady(int index);
int Socket_continueWrites(SOCKET* socket, mutex_type mutex);
......@@ -153,10 +153,12 @@ void Socket_outInitialize(void)
memcpy((void*)&(mod_s.rset_saved), (void*)&(mod_s.rset), sizeof(mod_s.rset_saved));
#else
mod_s.nfds = 0;
mod_s.fds = NULL;
mod_s.fds_read = NULL;
mod_s.fds_write = NULL;
mod_s.saved.cur_fd = -1;
mod_s.saved.fds = NULL;
mod_s.saved.fds_write = NULL;
mod_s.saved.fds_read = NULL;
mod_s.saved.nfds = 0;
#endif
FUNC_EXIT;
......@@ -174,10 +176,14 @@ void Socket_outTerminate(void)
#if defined(USE_SELECT)
ListFree(mod_s.clientsds);
#else
if (mod_s.fds)
free(mod_s.fds);
if (mod_s.saved.fds)
free(mod_s.saved.fds);
if (mod_s.fds_read)
free(mod_s.fds_read);
if (mod_s.fds_write)
free(mod_s.fds_write);
if (mod_s.saved.fds_write)
free(mod_s.saved.fds_write);
if (mod_s.saved.fds_read)
free(mod_s.saved.fds_read);
#endif
SocketBuffer_terminate();
#if defined(_WIN32) || defined(_WIN64)
......@@ -192,7 +198,7 @@ void Socket_outTerminate(void)
* Add a socket to the list of socket to check with select
* @param newSd the new socket to add
*/
int Socket_addSocket(int newSd)
int Socket_addSocket(SOCKET newSd)
{
int rc = 0;
......@@ -206,7 +212,7 @@ int Socket_addSocket(int newSd)
}
else
{
int* pnewSd = (int*)malloc(sizeof(newSd));
SOCKET* pnewSd = (SOCKET*)malloc(sizeof(newSd));
if (!pnewSd)
{
......@@ -221,7 +227,7 @@ int Socket_addSocket(int newSd)
goto exit;
}
FD_SET(newSd, &(mod_s.rset_saved));
mod_s.maxfdp1 = max(mod_s.maxfdp1, newSd + 1);
mod_s.maxfdp1 = max(mod_s.maxfdp1, (int)newSd + 1);
rc = Socket_setnonblocking(newSd);
if (rc == SOCKET_ERROR)
Log(LOG_ERROR, -1, "addSocket: setnonblocking");
......@@ -263,25 +269,38 @@ int Socket_addSocket(SOCKET newSd)
FUNC_ENTRY;
mod_s.nfds++;
if (mod_s.fds)
mod_s.fds = realloc(mod_s.fds, mod_s.nfds * sizeof(mod_s.fds[0]));
if (mod_s.fds_read)
mod_s.fds_read = realloc(mod_s.fds_read, mod_s.nfds * sizeof(mod_s.fds_read[0]));
else
mod_s.fds_read = malloc(mod_s.nfds * sizeof(mod_s.fds_read[0]));
if (!mod_s.fds_read)
{
rc = PAHO_MEMORY_ERROR;
goto exit;
}
if (mod_s.fds_write)
mod_s.fds_write = realloc(mod_s.fds_write, mod_s.nfds * sizeof(mod_s.fds_write[0]));
else
mod_s.fds = malloc(mod_s.nfds * sizeof(mod_s.fds[0]));
if (!mod_s.fds)
mod_s.fds_write = malloc(mod_s.nfds * sizeof(mod_s.fds_write[0]));
if (!mod_s.fds_read)
{
rc = PAHO_MEMORY_ERROR;
goto exit;
}
mod_s.fds[mod_s.nfds - 1].fd = newSd;
mod_s.fds_read[mod_s.nfds - 1].fd = newSd;
mod_s.fds_write[mod_s.nfds - 1].fd = newSd;
#if defined(_WIN32) || defined(_WIN64)
mod_s.fds[mod_s.nfds - 1].events = POLLIN | POLLOUT;
mod_s.fds_read[mod_s.nfds - 1].events = POLLIN;
mod_s.fds_write[mod_s.nfds - 1].events = POLLOUT;
#else
mod_s.fds[mod_s.nfds - 1].events = POLLIN | POLLOUT | POLLNVAL;
mod_s.fds_read[mod_s.nfds - 1].events = POLLIN | POLLNVAL;
mod_s.fds_write[mod_s.nfds - 1].events = POLLOUT;
#endif
/* sort the poll fds array by socket number */
qsort(mod_s.fds, (size_t)mod_s.nfds, sizeof(mod_s.fds[0]), cmpfds);
qsort(mod_s.fds_read, (size_t)mod_s.nfds, sizeof(mod_s.fds_read[0]), cmpfds);
qsort(mod_s.fds_write, (size_t)mod_s.nfds, sizeof(mod_s.fds_write[0]), cmpfds);
rc = Socket_setnonblocking(newSd);
if (rc == SOCKET_ERROR)
......@@ -325,19 +344,20 @@ int isReady(int socket, fd_set* read_set, fd_set* write_set)
int isReady(int index)
{
int rc = 1;
SOCKET* socket = &mod_s.saved.fds[index].fd;
SOCKET* socket = &mod_s.saved.fds_write[index].fd;
FUNC_ENTRY;
if ((mod_s.saved.fds[index].revents & POLLHUP) || (mod_s.saved.fds[index].revents & POLLNVAL))
if ((mod_s.saved.fds_read[index].revents & POLLHUP) || (mod_s.saved.fds_read[index].revents & POLLNVAL))
; /* signal work to be done if there is an error on the socket */
else if (ListFindItem(mod_s.connect_pending, socket, intcompare) &&
(mod_s.saved.fds[index].revents & POLLOUT))
(mod_s.saved.fds_write[index].revents & POLLOUT))
ListRemoveItem(mod_s.connect_pending, socket, intcompare);
else
rc = (mod_s.saved.fds[index].revents & POLLIN) &&
(mod_s.saved.fds[index].revents & POLLOUT) &&
rc = (mod_s.saved.fds_read[index].revents & POLLIN) &&
(mod_s.saved.fds_write[index].revents & POLLOUT) &&
Socket_noPendingWrites(*socket);
FUNC_EXIT_RC(rc);
return rc;
}
......@@ -353,9 +373,9 @@ int isReady(int index)
* @param rc a value other than 0 indicates an error of the returned socket
* @return the socket next ready, or 0 if none is ready
*/
int Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int* rc)
SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int* rc)
{
int sock = 0;
SOCKET sock = 0;
*rc = 0;
int timeout_ms = 1000;
......@@ -487,15 +507,22 @@ SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int*
if (mod_s.saved.cur_fd == -1)
{
int rc1 = 0;
if (mod_s.nfds != mod_s.saved.nfds)
{
mod_s.saved.nfds = mod_s.nfds;
if (mod_s.saved.fds)
mod_s.saved.fds = realloc(mod_s.saved.fds, mod_s.nfds * sizeof(struct pollfd));
if (mod_s.saved.fds_read)
mod_s.saved.fds_read = realloc(mod_s.saved.fds_read, mod_s.nfds * sizeof(struct pollfd));
else
mod_s.saved.fds_read = malloc(mod_s.nfds * sizeof(struct pollfd));
if (mod_s.saved.fds_write)
mod_s.saved.fds_write = realloc(mod_s.saved.fds_write, mod_s.nfds * sizeof(struct pollfd));
else
mod_s.saved.fds = malloc(mod_s.nfds * sizeof(struct pollfd));
mod_s.saved.fds_write = malloc(mod_s.nfds * sizeof(struct pollfd));
}
memcpy(mod_s.saved.fds, mod_s.fds, mod_s.nfds * sizeof(struct pollfd));
memcpy(mod_s.saved.fds_read, mod_s.fds_read, mod_s.nfds * sizeof(struct pollfd));
memcpy(mod_s.saved.fds_write, mod_s.fds_write, mod_s.nfds * sizeof(struct pollfd));
if (mod_s.saved.nfds == 0)
{
......@@ -503,9 +530,17 @@ SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int*
goto exit; /* no work to do */
}
/* Check pending write set for writeable sockets */
rc1 = poll(mod_s.saved.fds_write, mod_s.saved.nfds, 0);
if (rc1 > 0 && Socket_continueWrites(&sock, mutex) == SOCKET_ERROR)
{
*rc = SOCKET_ERROR;
goto exit;
}
/* Prevent performance issue by unlocking the socket_mutex while waiting for a ready socket. */
Thread_unlock_mutex(mutex);
*rc = poll(mod_s.saved.fds, mod_s.saved.nfds, timeout_ms);
*rc = poll(mod_s.saved.fds_read, mod_s.saved.nfds, timeout_ms);
Thread_lock_mutex(mutex);
if (*rc == SOCKET_ERROR)
{
......@@ -514,13 +549,7 @@ SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int*
}
Log(TRACE_MAX, -1, "Return code %d from poll", *rc);
if (Socket_continueWrites(&sock, mutex) == SOCKET_ERROR)
{
*rc = SOCKET_ERROR;
goto exit;
}
if (*rc == 0)
if (rc1 == 0 && *rc == 0)
{
sock = 0;
goto exit; /* no work to do */
......@@ -540,7 +569,7 @@ SOCKET Socket_getReadySocket(int more_work, int timeout, mutex_type mutex, int*
sock = 0;
else
{
sock = mod_s.saved.fds[mod_s.saved.cur_fd].fd;
sock = mod_s.saved.fds_read[mod_s.saved.cur_fd].fd;
mod_s.saved.cur_fd = (mod_s.saved.cur_fd == mod_s.saved.nfds - 1) ? -1 : mod_s.saved.cur_fd + 1;
}
exit:
......@@ -915,25 +944,57 @@ int Socket_close(SOCKET socket)
ListRemoveItem(mod_s.connect_pending, &socket, intcompare);
ListRemoveItem(mod_s.write_pending, &socket, intcompare);
fd = bsearch(&socket, mod_s.fds, (size_t)mod_s.nfds, sizeof(mod_s.fds[0]), cmpsockfds);
if (mod_s.nfds == 0)
goto exit;
fd = bsearch(&socket, mod_s.fds_read, (size_t)mod_s.nfds, sizeof(mod_s.fds_read[0]), cmpsockfds);
if (fd)
{
struct pollfd* last_fd = &mod_s.fds[mod_s.nfds - 1];
struct pollfd* last_fd = &mod_s.fds_read[mod_s.nfds - 1];
if (--mod_s.nfds == 0)
{
free(mod_s.fds);
mod_s.fds = NULL;
free(mod_s.fds_read);
mod_s.fds_read = NULL;
}
else
{
if (fd != last_fd)
{
/* shift array to remove the socket in question */
memmove(fd, fd + 1, (mod_s.nfds - (fd - mod_s.fds_read)) * sizeof(mod_s.fds_read[0]));
}
mod_s.fds_read = realloc(mod_s.fds_read, sizeof(mod_s.fds_read[0]) * mod_s.nfds);
if (mod_s.fds_read == NULL)
{
rc = PAHO_MEMORY_ERROR;
goto exit;
}
}
Log(TRACE_MIN, -1, "Removed socket %d", socket);
}
else
Log(LOG_ERROR, -1, "Failed to remove socket %d", socket);
fd = bsearch(&socket, mod_s.fds_write, (size_t)(mod_s.nfds+1), sizeof(mod_s.fds_write[0]), cmpsockfds);
if (fd)
{
struct pollfd* last_fd = &mod_s.fds_write[mod_s.nfds];
if (mod_s.nfds == 0)
{
free(mod_s.fds_write);
mod_s.fds_write = NULL;
}
else
{
if (fd != last_fd)
{
/* shift array to remove the socket in question */
memmove(fd, fd + 1, (mod_s.nfds - (fd - mod_s.fds)) * sizeof(mod_s.fds[0]));
memmove(fd, fd + 1, (mod_s.nfds - (fd - mod_s.fds_write)) * sizeof(mod_s.fds_write[0]));
}
mod_s.fds = realloc(mod_s.fds, sizeof(mod_s.fds[0]) * mod_s.nfds);
if (mod_s.fds == NULL)
mod_s.fds_write = realloc(mod_s.fds_write, sizeof(mod_s.fds_write[0]) * mod_s.nfds);
if (mod_s.fds_write == NULL)
{
rc = PAHO_MEMORY_ERROR;
goto exit;
......@@ -1142,6 +1203,12 @@ exit:
return rc;
}
static Socket_writeContinue* writecontinue = NULL;
void Socket_setWriteContinueCallback(Socket_writeContinue* mywritecontinue)
{
writecontinue = mywritecontinue;
}
static Socket_writeComplete* writecomplete = NULL;
......@@ -1285,7 +1352,7 @@ exit:
* @param sock in case of a socket error contains the affected socket
* @return completion code, 0 or SOCKET_ERROR
*/
int Socket_continueWrites(fd_set* pwset, int* sock, mutex_type mutex)
int Socket_continueWrites(fd_set* pwset, SOCKET* sock, mutex_type mutex)
#else
/**
* Continue any outstanding socket writes
......@@ -1311,7 +1378,7 @@ int Socket_continueWrites(SOCKET* sock, mutex_type mutex)
struct pollfd* fd;
/* find the socket in the fds structure */
fd = bsearch(&socket, mod_s.saved.fds, (size_t)mod_s.saved.nfds, sizeof(mod_s.saved.fds[0]), cmpsockfds);
fd = bsearch(&socket, mod_s.saved.fds_write, (size_t)mod_s.saved.nfds, sizeof(mod_s.saved.fds_write[0]), cmpsockfds);
if ((fd->revents & POLLOUT) && ((rc = Socket_continueWrite(socket)) != 0))
#endif
......@@ -1341,6 +1408,9 @@ int Socket_continueWrites(SOCKET* sock, mutex_type mutex)
else
ListNextElement(mod_s.write_pending, &curpending);
if (rc == 0)
(*writecontinue)(socket);
if (rc == SOCKET_ERROR)
{
*sock = socket;
......
......@@ -123,12 +123,14 @@ typedef struct
fd_set pending_wset; /**< socket pending write set for select */
#else
unsigned int nfds; /**< no of file descriptors for poll */
struct pollfd* fds; /**< poll read file descriptors */
struct pollfd* fds_read; /**< poll read file descriptors */
struct pollfd* fds_write;
struct {
int cur_fd; /**< index into the fds_saved array */
unsigned int nfds; /**< number of fds in the fds_saved array */
struct pollfd* fds;
struct pollfd* fds_write;
struct pollfd* fds_read;
} saved;
#endif
} Sockets;
......@@ -154,6 +156,9 @@ char* Socket_getpeer(SOCKET sock);
void Socket_addPendingWrite(SOCKET socket);
void Socket_clearPendingWrite(SOCKET socket);
typedef void Socket_writeContinue(SOCKET socket);
void Socket_setWriteContinueCallback(Socket_writeContinue*);
typedef void Socket_writeComplete(SOCKET socket, int rc);
void Socket_setWriteCompleteCallback(Socket_writeComplete*);
......
/*******************************************************************************
* Copyright (c) 2009, 2021 IBM Corp. and Ian Craggs
* Copyright (c) 2009, 2022 IBM Corp. and Ian Craggs
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
......@@ -80,6 +80,32 @@ void Thread_start(thread_fn fn, void* parameter)
}
int Thread_set_name(const char* thread_name)
{
int rc = 0;
FUNC_ENTRY;
#if defined(_WIN32) || defined(_WIN64)
#if defined(_MSC_VER) && _MSC_VER >= 1920
rc = (int)SetThreadDescription(GetCurrentThread(), (PCWSTR)thread_name);
#endif
#elif defined(OSX)
// pthread_setname_np __API_AVAILABLE(macos(10.6), ios(3.2))
#if defined(__APPLE__) && __MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_6
rc = pthread_setname_np(thread_name);
#endif
#else
#if defined(__GNUC__) && defined(__linux__)
#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 12
rc = pthread_setname_np(Thread_getid(), thread_name);
#endif
#endif
#endif
FUNC_EXIT_RC(rc);
return rc;
}
/**
* Create a new mutex
* @param rc return code: 0 for success, negative otherwise
......
/*******************************************************************************
* Copyright (c) 2009, 2020 IBM Corp.
* Copyright (c) 2009, 2022 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
......@@ -20,6 +20,15 @@
#if !defined(THREAD_H)
#define THREAD_H
#if !defined(_WIN32) && !defined(_WIN64)
#if defined(__GNUC__) && defined(__linux__)
#if !defined(_GNU_SOURCE)
// for pthread_setname
#define _GNU_SOURCE
#endif
#endif
#endif
#include "MQTTExportDeclarations.h"
#include "MQTTClient.h"
......@@ -60,6 +69,7 @@
#endif
LIBMQTT_API void Thread_start(thread_fn, void*);
int Thread_set_name(const char* thread_name);
LIBMQTT_API mutex_type Thread_create_mutex(int*);
LIBMQTT_API int Thread_lock_mutex(mutex_type);
......
......@@ -44,7 +44,7 @@
// Better not to flood a public broker. Test against localhost.
#define ADDRESS "tcp://localhost:1883"
#define ADDRESS "mqtt://localhost:1883"
#define CLIENTID "ExampleClientTimePub"
#define TOPIC "data/time"
......
......@@ -1025,7 +1025,7 @@ IF (PAHO_WITH_SSL)
ADD_TEST(
NAME test5-6-multiple-connections-static
COMMAND test5-static "--test_no" "9" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt" --verbose
COMMAND test5-static "--test_no" "9" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt"
)
ADD_TEST(
......@@ -1035,12 +1035,12 @@ IF (PAHO_WITH_SSL)
ADD_TEST(
NAME test5-7-big-messages-static
COMMAND test5-static "--test_no" "10" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt"
COMMAND test5-static "--test_no" "10" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt"
)
ADD_TEST(
NAME test5-7-ws-big-messages-static
COMMAND test5-static "--test_no" "10" "--ws" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt" --verbose
COMMAND test5-static "--test_no" "10" "--ws" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt"
)
ADD_TEST(
......@@ -1221,12 +1221,12 @@ IF (PAHO_WITH_SSL)
ADD_TEST(
NAME test5-7-big-messages
COMMAND test5 "--test_no" "10" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt"
COMMAND test5 "--test_no" "10" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt"
)
ADD_TEST(
NAME test5-7-ws-big-messages
COMMAND test5 "--test_no" "10" "--ws" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt" --verbose
COMMAND test5 "--test_no" "10" "--ws" "--hostname" ${MQTT_SSL_HOSTNAME} "--client_key" "${CERTDIR}/client.pem" "--server_key" "${CERTDIR}/test-root-ca.crt"
)
ADD_TEST(
......@@ -1399,7 +1399,7 @@ IF (PAHO_BUILD_STATIC)
ADD_TEST(
NAME test8-6-blocked-acks-static
COMMAND test8-static "--test_no" "8" "--connection" ${MQTT_TEST_BROKER} "--size" "500000" --verbose
COMMAND test8-static "--test_no" "8" "--connection" ${MQTT_TEST_BROKER} "--size" "500000"
)
SET_TESTS_PROPERTIES(
......@@ -1463,7 +1463,7 @@ IF (PAHO_BUILD_SHARED)
ADD_TEST(
NAME test8-6-blocked-acks
COMMAND test8 "--test_no" "8" "--connection" ${MQTT_TEST_BROKER} "--size" "500000" --verbose
COMMAND test8 "--test_no" "8" "--connection" ${MQTT_TEST_BROKER} "--size" "500000"
)
SET_TESTS_PROPERTIES(
......@@ -1549,7 +1549,7 @@ IF (PAHO_BUILD_STATIC)
ADD_TEST(
NAME test9-2ws-offline-buffering-send-disconnected-serverURIs-static
COMMAND test9-static "--test_no" "2" "--connection" ${MQTT_WS_TEST_BROKER} "--proxy_connection" ${MQTT_WS_TEST_PROXY} --verbose
COMMAND test9-static "--test_no" "2" "--connection" ${MQTT_WS_TEST_BROKER} "--proxy_connection" ${MQTT_WS_TEST_PROXY}
)
ADD_TEST(
......
......@@ -89,11 +89,11 @@ struct Options
} options =
{
"ssl://localhost:18883",
"ssl://localhost:18884",
"mqtts://localhost:18884",
"ssl://localhost:18887",
"ssl://localhost:18885",
"mqtts://localhost:18885",
"ssl://localhost:18886",
"ssl://localhost:18888",
"mqtts://localhost:18888",
NULL,
0,
"../../../test/ssl/client.pem",
......
......@@ -74,11 +74,11 @@ struct Options
} options =
{
"ssl://localhost:18883",
"ssl://localhost:18884",
"mqtts://localhost:18884",
"ssl://localhost:18887",
"ssl://localhost:18885",
"mqtts://localhost:18885",
"ssl://localhost:18886",
"ssl://localhost:18888",
"mqtts://localhost:18888",
NULL, // "../../../test/ssl/client.pem",
NULL,
NULL, // "../../../test/ssl/test-root-ca.crt",
......@@ -2178,6 +2178,7 @@ int test7MessageArrived(void* context, char* topicName, int topicLen,
opts.context = tc;
rc = MQTTAsync_sendMessage(tc->client, tc->topic, &pubmsg, &opts);
assert("Publish successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
else if (message_count < options.message_count)
{
......@@ -2192,6 +2193,7 @@ int test7MessageArrived(void* context, char* topicName, int topicLen,
opts.onFailure = test7OnPublishFailure;
opts.context = tc;
rc = MQTTAsync_sendMessage(tc->client, tc->topic, &pubmsg, &opts);
assert("Publish successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
else
{
......@@ -2237,6 +2239,7 @@ void test7OnSubscribe(void* context, MQTTAsync_successData* response)
rc = MQTTAsync_send(tc->client, tc->topic, pubmsg.payloadlen, pubmsg.payload,
pubmsg.qos, pubmsg.retained, &opts);
assert("Publish successful", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
}
void test7OnConnect(void* context, MQTTAsync_successData* response)
......
......@@ -920,7 +920,7 @@ int test5a(struct Options options)
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
int rc = 0;
char* test_topic = "C client test5a";
char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", "tcp://localhost:1882"};
char* serverURIs[3] = {"tcp://localhost:1880", "mqtt://localhost:1881", "tcp://localhost:1882"};
failures = 0;
MyLog(LOGA_INFO, "Starting test 5a - All HA connections out of service");
......@@ -982,7 +982,7 @@ int test5b(struct Options options)
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
int rc = 0;
char* test_topic = "C client test5b";
char* serverURIs[3] = {"tcp://localhost:1880", "tcp://localhost:1881", options.connection};
char* serverURIs[3] = {"tcp://localhost:1880", "mqtt://localhost:1881", options.connection};
failures = 0;
MyLog(LOGA_INFO, "Starting test 5b - All HA connections out of service except the last one");
......
10
\ No newline at end of file
11
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册