提交 005d7228 编写于 作者: 麦壳饼's avatar 麦壳饼

移除

上级 48d9b058
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{EE801BD6-5757-4A55-BC94-7FBA6E98F393}</ProjectGuid>
<OutputType>Exe</OutputType>
<RootNamespace>CoAPClient</RootNamespace>
<AssemblyName>CoAPClient</AssemblyName>
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<Deterministic>true</Deterministic>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="readme.md" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="CoAP">
<Version>1.1.0</Version>
</PackageReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>
\ No newline at end of file

using CoAP;
using CoAP.Util;
using System;
using System.Collections.Generic;
namespace CoAPClient
{
internal class Program
{
private static void Main(string[] args)
{
string method = null;
Uri uri = null;
string payload = null;
bool loop = false;
bool byEvent = true;
if (args.Length == 0)
PrintUsage();
Int32 index = 0;
foreach (String arg in args)
{
if (arg[0] == '-')
{
if (arg.Equals("-l"))
loop = true;
if (arg.Equals("-e"))
byEvent = true;
else
Console.WriteLine("Unknown option: " + arg);
}
else
{
switch (index)
{
case 0:
method = arg.ToUpper();
break;
case 1:
try
{
uri = new Uri(arg);
}
catch (Exception ex)
{
Console.WriteLine("Failed parsing URI: " + ex.Message);
Environment.Exit(1);
}
break;
case 2:
payload = arg;
break;
default:
Console.WriteLine("Unexpected argument: " + arg);
break;
}
index++;
}
}
if (method == null || uri == null)
PrintUsage();
Request request = NewRequest(method);
if (request == null)
{
Console.WriteLine("Unknown method: " + method);
Environment.Exit(1);
}
if ("OBSERVE".Equals(method))
{
request.MarkObserve();
loop = true;
}
else if ("DISCOVER".Equals(method) &&
(String.IsNullOrEmpty(uri.AbsolutePath) || uri.AbsolutePath.Equals("/")))
{
uri = new Uri(uri, "/.well-known/core");
}
request.URI = uri;
request.SetPayload(payload, MediaType.TextPlain);
// uncomment the next line if you want to specify a draft to use
// request.EndPoint = CoAP.Net.EndPointManager.Draft13;
Console.WriteLine(Utils.ToString(request));
try
{
if (byEvent)
{
request.Respond += delegate (Object sender, ResponseEventArgs e)
{
Response response = e.Response;
if (response == null)
{
Console.WriteLine("Request timeout");
}
else
{
Console.WriteLine(Utils.ToString(response));
Console.WriteLine("Time (ms): " + response.RTT);
}
if (!loop)
Environment.Exit(0);
};
request.Send();
while (true)
{
Console.ReadKey();
}
}
else
{
// uncomment the next line if you need retransmission disabled.
// request.AckTimeout = -1;
request.Send();
do
{
Console.WriteLine("Receiving response...");
Response response = null;
response = request.WaitForResponse();
if (response == null)
{
Console.WriteLine("Request timeout");
break;
}
else
{
Console.WriteLine(Utils.ToString(response));
Console.WriteLine("Time elapsed (ms): " + response.RTT);
if (response.ContentType == MediaType.ApplicationLinkFormat)
{
IEnumerable<WebLink> links = LinkFormat.Parse(response.PayloadString);
if (links == null)
{
Console.WriteLine("Failed parsing link format");
Environment.Exit(1);
}
else
{
Console.WriteLine("Discovered resources:");
foreach (var link in links)
{
Console.WriteLine(link);
}
}
}
}
} while (loop);
}
}
catch (Exception ex)
{
Console.WriteLine("Failed executing request: " + ex.Message);
Console.WriteLine(ex);
Environment.Exit(1);
}
}
private static Request NewRequest(String method)
{
switch (method)
{
case "POST":
return Request.NewPost();
case "PUT":
return Request.NewPut();
case "DELETE":
return Request.NewDelete();
case "GET":
case "DISCOVER":
case "OBSERVE":
return Request.NewGet();
default:
return null;
}
}
private static void PrintUsage()
{
Console.WriteLine("CoAP.NET Client");
Console.WriteLine();
Console.WriteLine("Usage: CoAPClient [-e] [-l] method uri [payload]");
Console.WriteLine(" method : { GET, POST, PUT, DELETE, DISCOVER, OBSERVE }");
Console.WriteLine(" uri : The CoAP URI of the remote endpoint or resource.");
Console.WriteLine(" payload : The data to send with the request.");
Console.WriteLine("Options:");
Console.WriteLine(" -e : Receives responses by the Responded event.");
Console.WriteLine(" -l : Loops for multiple responses.");
Console.WriteLine(" (automatic for OBSERVE and separate responses)");
Console.WriteLine();
Console.WriteLine("Examples:");
Console.WriteLine(" CoAPClient DISCOVER coap://localhost");
Console.WriteLine(" CoAPClient POST coap://localhost/storage data");
Environment.Exit(0);
}
}
}
\ No newline at end of file
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// 有关程序集的一般信息由以下
// 控制。更改这些特性值可修改
// 与程序集关联的信息。
[assembly: AssemblyTitle("CoAPClient")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("CoAPClient")]
[assembly: AssemblyCopyright("Copyright © 2019")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// 将 ComVisible 设置为 false 会使此程序集中的类型
//对 COM 组件不可见。如果需要从 COM 访问此程序集中的类型
//请将此类型的 ComVisible 特性设置为 true。
[assembly: ComVisible(false)]
// 如果此项目向 COM 公开,则下列 GUID 用于类型库的 ID
[assembly: Guid("ee801bd6-5757-4a55-bc94-7fba6e98f393")]
// 程序集的版本信息由下列四个值组成:
//
// 主版本
// 次版本
// 生成号
// 修订号
//
//可以指定所有这些值,也可以使用“生成号”和“修订号”的默认值
//通过使用 "*",如下所示:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
Example:
Telemetry:
CoAPClient.exe POST coap://<_server_>/Telemetry?<_token_> <_playload_>
CoAPClient.exe POST coap://localhost/Telemetry?3cb97cd31fbc40b08d12ec47a6fad622 {\"aaa\":\"bbb\"}
Attributes:
CoAPClient.exe POST coap://<_server_>/Attributes?<_token_> <_playload_>
CoAPClient.exe POST coap://localhost/Attributes?3cb97cd31fbc40b08d12ec47a6fad622 {\"aaa\":\"bbb\"}
URI format:
Json:
POST coap://localhost/Attributes?3cb97cd31fbc40b08d12ec47a6fad622
Xml document:
POST coap://localhost/Attributes?3cb97cd31fbc40b08d12ec47a6fad622&xml&keyname
binary:
POST coap://localhost/Attributes?3cb97cd31fbc40b08d12ec47a6fad622&binary&keyname
\ No newline at end of file
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|ARM">
<Configuration>Debug</Configuration>
<Platform>ARM</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|ARM">
<Configuration>Release</Configuration>
<Platform>ARM</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|ARM64">
<Configuration>Debug</Configuration>
<Platform>ARM64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|ARM64">
<Configuration>Release</Configuration>
<Platform>ARM64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x86">
<Configuration>Debug</Configuration>
<Platform>x86</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x86">
<Configuration>Release</Configuration>
<Platform>x86</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{dd36544c-2e7c-4388-b34f-ff705e141e4a}</ProjectGuid>
<Keyword>Linux</Keyword>
<RootNamespace>MQTT_C_Client</RootNamespace>
<MinimumVisualStudioVersion>15.0</MinimumVisualStudioVersion>
<ApplicationType>Linux</ApplicationType>
<ApplicationTypeRevision>1.0</ApplicationTypeRevision>
<TargetLinuxPlatform>Generic</TargetLinuxPlatform>
<LinuxProjectType>{D51BCBC9-82E9-4017-911E-C93873C4EA2B}</LinuxProjectType>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|ARM'" Label="Configuration">
<UseDebugLibraries>true</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|ARM'" Label="Configuration">
<UseDebugLibraries>false</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x86'" Label="Configuration">
<UseDebugLibraries>true</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x86'" Label="Configuration">
<UseDebugLibraries>false</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<UseDebugLibraries>true</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<UseDebugLibraries>false</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|ARM64'" Label="Configuration">
<UseDebugLibraries>false</UseDebugLibraries>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|ARM64'" Label="Configuration">
<UseDebugLibraries>true</UseDebugLibraries>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings" />
<ImportGroup Label="Shared" />
<ImportGroup Label="PropertySheets" />
<PropertyGroup Label="UserMacros" />
<ItemGroup>
<ClCompile Include="main.c" />
<ClCompile Include="mqtt.c" />
<ClCompile Include="mqtt_pal.c" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="mqtt.h" />
<ClInclude Include="mqtt_pal.h" />
<ClInclude Include="templates\bio_sockets.h" />
<ClInclude Include="templates\openssl_sockets.h" />
<ClInclude Include="templates\posix_sockets.h" />
</ItemGroup>
<ItemGroup>
<None Include="readme.md" />
</ItemGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<Link>
<LibraryDependencies>pthread;%(LibraryDependencies)</LibraryDependencies>
</Link>
<ClCompile>
<CLanguageStandard>gnu99</CLanguageStandard>
</ClCompile>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<Link>
<LibraryDependencies>pthread;%(LibraryDependencies)</LibraryDependencies>
</Link>
<ClCompile>
<CLanguageStandard>c11</CLanguageStandard>
<AdditionalOptions>-Wextra -Wall -std=gnu99 -Iinclude -Wno-unused-parameter -Wno-unused-variable %(AdditionalOptions)</AdditionalOptions>
</ClCompile>
</ItemDefinitionGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets" />
</Project>
\ No newline at end of file
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="main.c" />
<ClCompile Include="mqtt.c" />
<ClCompile Include="mqtt_pal.c">
<Filter>include</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="templates\bio_sockets.h">
<Filter>include</Filter>
</ClInclude>
<ClInclude Include="mqtt.h">
<Filter>include</Filter>
</ClInclude>
<ClInclude Include="mqtt_pal.h">
<Filter>include</Filter>
</ClInclude>
<ClInclude Include="templates\openssl_sockets.h">
<Filter>include</Filter>
</ClInclude>
<ClInclude Include="templates\posix_sockets.h">
<Filter>include</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="include">
<UniqueIdentifier>{237da9ec-b626-49d7-ad63-da64c71aada0}</UniqueIdentifier>
</Filter>
</ItemGroup>
<ItemGroup>
<None Include="readme.md" />
</ItemGroup>
</Project>
\ No newline at end of file
/**
* @file
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include "mqtt.h"
#include "templates/posix_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish* published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void* client_refresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void exit_example(int status, int sockfd, pthread_t* client_daemon);
#define IOTSHARP_CLIENTID "clientid89f866fced124f6e902515705f78148a"
#define IOTSHARP_ACCESSTOKEN "3cb97cd31fbc40b08d12ec47a6fad622"
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char* argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
}
else {
addr = "127.0.0.1";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
}
else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
}
else {
topic = "/devices/me/telemetry";
}
/* open the non-blocking TCP socket (connecting to the broker) */
int sockfd = open_nb_socket(addr, port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_connect(&client, IOTSHARP_CLIENTID, NULL, NULL, 0, IOTSHARP_ACCESSTOKEN, NULL, 0, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
pthread_t client_daemon;
if (pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while (fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "{\"mydatetimei\":\"%s\"}", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_0);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
void exit_example(int status, int sockfd, pthread_t* client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
void publish_callback(void** unused, struct mqtt_response_publish* published)
{
/* not used in this example */
}
void* client_refresher(void* client)
{
while (1)
{
mqtt_sync((struct mqtt_client*) client);
usleep(100000U);
}
return NULL;
}
\ No newline at end of file
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "mqtt.h"
/**
* @file
* @brief Implements the functionality of MQTT-C.
* @note The only files that are included are mqtt.h and mqtt_pal.h.
*
* @cond Doxygen_Suppress
*/
enum MQTTErrors mqtt_sync(struct mqtt_client *client) {
/* Recover from any errors */
enum MQTTErrors err;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
if (client->error != MQTT_OK && client->reconnect_callback != NULL) {
client->reconnect_callback(client, &client->reconnect_state);
/* unlocked during CONNECT */
} else {
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
}
/* Call inspector callback if necessary */
if (client->inspector_callback != NULL) {
MQTT_PAL_MUTEX_LOCK(&client->mutex);
err = client->inspector_callback(client);
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
if (err != MQTT_OK) return err;
}
/* Call receive */
err = __mqtt_recv(client);
if (err != MQTT_OK) return err;
/* Call send */
err = __mqtt_send(client);
return err;
}
uint16_t __mqtt_next_pid(struct mqtt_client *client) {
int pid_exists = 0;
if (client->pid_lfsr == 0) {
client->pid_lfsr = 163u;
}
/* LFSR taps taken from: https://en.wikipedia.org/wiki/Linear-feedback_shift_register */
do {
struct mqtt_queued_message *curr;
unsigned lsb = client->pid_lfsr & 1;
(client->pid_lfsr) >>= 1;
if (lsb) {
client->pid_lfsr ^= 0xB400u;
}
/* check that the PID is unique */
pid_exists = 0;
for(curr = mqtt_mq_get(&(client->mq), 0); curr >= client->mq.queue_tail; --curr) {
if (curr->packet_id == client->pid_lfsr) {
pid_exists = 1;
break;
}
}
} while(pid_exists);
return client->pid_lfsr;
}
enum MQTTErrors mqtt_init(struct mqtt_client *client,
mqtt_pal_socket_handle sockfd,
uint8_t *sendbuf, size_t sendbufsz,
uint8_t *recvbuf, size_t recvbufsz,
void (*publish_response_callback)(void** state,struct mqtt_response_publish *publish))
{
if (client == NULL || sendbuf == NULL || recvbuf == NULL) {
return MQTT_ERROR_NULLPTR;
}
/* initialize mutex */
MQTT_PAL_MUTEX_INIT(&client->mutex);
MQTT_PAL_MUTEX_LOCK(&client->mutex); /* unlocked during CONNECT */
client->socketfd = sockfd;
mqtt_mq_init(&client->mq, sendbuf, sendbufsz);
client->recv_buffer.mem_start = recvbuf;
client->recv_buffer.mem_size = recvbufsz;
client->recv_buffer.curr = client->recv_buffer.mem_start;
client->recv_buffer.curr_sz = client->recv_buffer.mem_size;
client->error = MQTT_ERROR_CONNECT_NOT_CALLED;
client->response_timeout = 30;
client->number_of_timeouts = 0;
client->number_of_keep_alives = 0;
client->typical_response_time = -1.0;
client->publish_response_callback = publish_response_callback;
client->pid_lfsr = 0;
client->inspector_callback = NULL;
client->reconnect_callback = NULL;
client->reconnect_state = NULL;
return MQTT_OK;
}
void mqtt_init_reconnect(struct mqtt_client *client,
void (*reconnect)(struct mqtt_client *, void**),
void *reconnect_state,
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish))
{
/* initialize mutex */
MQTT_PAL_MUTEX_INIT(&client->mutex);
client->socketfd = (mqtt_pal_socket_handle) -1;
mqtt_mq_init(&client->mq, NULL, 0);
client->recv_buffer.mem_start = NULL;
client->recv_buffer.mem_size = 0;
client->recv_buffer.curr = NULL;
client->recv_buffer.curr_sz = 0;
client->error = MQTT_ERROR_INITIAL_RECONNECT;
client->response_timeout = 30;
client->number_of_timeouts = 0;
client->number_of_keep_alives = 0;
client->typical_response_time = -1.0;
client->publish_response_callback = publish_response_callback;
client->inspector_callback = NULL;
client->reconnect_callback = reconnect;
client->reconnect_state = reconnect_state;
}
void mqtt_reinit(struct mqtt_client* client,
mqtt_pal_socket_handle socketfd,
uint8_t *sendbuf, size_t sendbufsz,
uint8_t *recvbuf, size_t recvbufsz)
{
client->error = MQTT_ERROR_CONNECT_NOT_CALLED;
client->socketfd = socketfd;
mqtt_mq_init(&client->mq, sendbuf, sendbufsz);
client->recv_buffer.mem_start = recvbuf;
client->recv_buffer.mem_size = recvbufsz;
client->recv_buffer.curr = client->recv_buffer.mem_start;
client->recv_buffer.curr_sz = client->recv_buffer.mem_size;
}
/**
* A macro function that:
* 1) Checks that the client isn't in an error state.
* 2) Attempts to pack to client's message queue.
* a) handles errors
* b) if mq buffer is too small, cleans it and tries again
* 3) Upon successful pack, registers the new message.
*/
#define MQTT_CLIENT_TRY_PACK(tmp, msg, client, pack_call, release) \
if (client->error < 0) { \
if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
return client->error; \
} \
tmp = pack_call; \
if (tmp < 0) { \
client->error = tmp; \
if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
return tmp; \
} else if (tmp == 0) { \
mqtt_mq_clean(&client->mq); \
tmp = pack_call; \
if (tmp < 0) { \
client->error = tmp; \
if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
return tmp; \
} else if(tmp == 0) { \
client->error = MQTT_ERROR_SEND_BUFFER_IS_FULL; \
if (release) MQTT_PAL_MUTEX_UNLOCK(&client->mutex); \
return MQTT_ERROR_SEND_BUFFER_IS_FULL; \
} \
} \
msg = mqtt_mq_register(&client->mq, tmp); \
enum MQTTErrors mqtt_connect(struct mqtt_client *client,
const char* client_id,
const char* will_topic,
const void* will_message,
size_t will_message_size,
const char* user_name,
const char* password,
uint8_t connect_flags,
uint16_t keep_alive)
{
ssize_t rv;
struct mqtt_queued_message *msg;
/* Note: Current thread already has mutex locked. */
/* update the client's state */
client->keep_alive = keep_alive;
if (client->error == MQTT_ERROR_CONNECT_NOT_CALLED) {
client->error = MQTT_OK;
}
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(rv, msg, client,
mqtt_pack_connection_request(
client->mq.curr, client->mq.curr_sz,
client_id, will_topic, will_message,
will_message_size,user_name, password,
connect_flags, keep_alive
),
1
);
/* save the control type of the message */
msg->control_type = MQTT_CONTROL_CONNECT;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
enum MQTTErrors mqtt_publish(struct mqtt_client *client,
const char* topic_name,
void* application_message,
size_t application_message_size,
uint8_t publish_flags)
{
struct mqtt_queued_message *msg;
ssize_t rv;
uint16_t packet_id;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
packet_id = __mqtt_next_pid(client);
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_publish_request(
client->mq.curr, client->mq.curr_sz,
topic_name,
packet_id,
application_message,
application_message_size,
publish_flags
),
1
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_PUBLISH;
msg->packet_id = packet_id;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id) {
ssize_t rv;
struct mqtt_queued_message *msg;
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_pubxxx_request(
client->mq.curr, client->mq.curr_sz,
MQTT_CONTROL_PUBACK,
packet_id
),
0
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_PUBACK;
msg->packet_id = packet_id;
return MQTT_OK;
}
ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id) {
ssize_t rv;
struct mqtt_queued_message *msg;
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_pubxxx_request(
client->mq.curr, client->mq.curr_sz,
MQTT_CONTROL_PUBREC,
packet_id
),
0
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_PUBREC;
msg->packet_id = packet_id;
return MQTT_OK;
}
ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id) {
ssize_t rv;
struct mqtt_queued_message *msg;
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_pubxxx_request(
client->mq.curr, client->mq.curr_sz,
MQTT_CONTROL_PUBREL,
packet_id
),
0
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_PUBREL;
msg->packet_id = packet_id;
return MQTT_OK;
}
ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id) {
ssize_t rv;
struct mqtt_queued_message *msg;
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_pubxxx_request(
client->mq.curr, client->mq.curr_sz,
MQTT_CONTROL_PUBCOMP,
packet_id
),
0
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_PUBCOMP;
msg->packet_id = packet_id;
return MQTT_OK;
}
enum MQTTErrors mqtt_subscribe(struct mqtt_client *client,
const char* topic_name,
int max_qos_level)
{
ssize_t rv;
uint16_t packet_id;
struct mqtt_queued_message *msg;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
packet_id = __mqtt_next_pid(client);
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_subscribe_request(
client->mq.curr, client->mq.curr_sz,
packet_id,
topic_name,
max_qos_level,
(const char*)NULL
),
1
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_SUBSCRIBE;
msg->packet_id = packet_id;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client,
const char* topic_name)
{
uint16_t packet_id = __mqtt_next_pid(client);
ssize_t rv;
struct mqtt_queued_message *msg;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_unsubscribe_request(
client->mq.curr, client->mq.curr_sz,
packet_id,
topic_name,
(const char*)NULL
),
1
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_UNSUBSCRIBE;
msg->packet_id = packet_id;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
enum MQTTErrors mqtt_ping(struct mqtt_client *client) {
enum MQTTErrors rv;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
rv = __mqtt_ping(client);
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return rv;
}
enum MQTTErrors __mqtt_ping(struct mqtt_client *client)
{
ssize_t rv;
struct mqtt_queued_message *msg;
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_ping_request(
client->mq.curr, client->mq.curr_sz
),
0
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_PINGREQ;
return MQTT_OK;
}
enum MQTTErrors mqtt_disconnect(struct mqtt_client *client)
{
ssize_t rv;
struct mqtt_queued_message *msg;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
/* try to pack the message */
MQTT_CLIENT_TRY_PACK(
rv, msg, client,
mqtt_pack_disconnect(
client->mq.curr, client->mq.curr_sz
),
1
);
/* save the control type and packet id of the message */
msg->control_type = MQTT_CONTROL_DISCONNECT;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
ssize_t __mqtt_send(struct mqtt_client *client)
{
uint8_t inspected;
ssize_t len;
int inflight_qos2 = 0;
int i = 0;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
if (client->error < 0 && client->error != MQTT_ERROR_SEND_BUFFER_IS_FULL) {
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return client->error;
}
/* loop through all messages in the queue */
len = mqtt_mq_length(&client->mq);
for(; i < len; ++i) {
struct mqtt_queued_message *msg = mqtt_mq_get(&client->mq, i);
int resend = 0;
if (msg->state == MQTT_QUEUED_UNSENT) {
/* message has not been sent to lets send it */
resend = 1;
} else if (msg->state == MQTT_QUEUED_AWAITING_ACK) {
/* check for timeout */
if (MQTT_PAL_TIME() > msg->time_sent + client->response_timeout) {
resend = 1;
client->number_of_timeouts += 1;
}
}
/* only send QoS 2 message if there are no inflight QoS 2 PUBLISH messages */
if (msg->control_type == MQTT_CONTROL_PUBLISH
&& (msg->state == MQTT_QUEUED_UNSENT || msg->state == MQTT_QUEUED_AWAITING_ACK))
{
inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */
if (inspected == 2) {
if (inflight_qos2) {
resend = 0;
}
inflight_qos2 = 1;
}
}
/* goto next message if we don't need to send */
if (!resend) {
continue;
}
/* we're sending the message */
{
ssize_t tmp = mqtt_pal_sendall(client->socketfd, msg->start, msg->size, 0);
if (tmp < 0) {
client->error = tmp;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return tmp;
}
}
/* update timeout watcher */
client->time_of_last_send = MQTT_PAL_TIME();
msg->time_sent = client->time_of_last_send;
/*
Determine the state to put the message in.
Control Types:
MQTT_CONTROL_CONNECT -> awaiting
MQTT_CONTROL_CONNACK -> n/a
MQTT_CONTROL_PUBLISH -> qos == 0 ? complete : awaiting
MQTT_CONTROL_PUBACK -> complete
MQTT_CONTROL_PUBREC -> awaiting
MQTT_CONTROL_PUBREL -> awaiting
MQTT_CONTROL_PUBCOMP -> complete
MQTT_CONTROL_SUBSCRIBE -> awaiting
MQTT_CONTROL_SUBACK -> n/a
MQTT_CONTROL_UNSUBSCRIBE -> awaiting
MQTT_CONTROL_UNSUBACK -> n/a
MQTT_CONTROL_PINGREQ -> awaiting
MQTT_CONTROL_PINGRESP -> n/a
MQTT_CONTROL_DISCONNECT -> complete
*/
switch (msg->control_type) {
case MQTT_CONTROL_PUBACK:
case MQTT_CONTROL_PUBCOMP:
case MQTT_CONTROL_DISCONNECT:
msg->state = MQTT_QUEUED_COMPLETE;
break;
case MQTT_CONTROL_PUBLISH:
inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */
if (inspected == 0) {
msg->state = MQTT_QUEUED_COMPLETE;
} else if (inspected == 1) {
msg->state = MQTT_QUEUED_AWAITING_ACK;
/*set DUP flag for subsequent sends */
msg->start[1] |= MQTT_PUBLISH_DUP;
} else {
msg->state = MQTT_QUEUED_AWAITING_ACK;
}
break;
case MQTT_CONTROL_CONNECT:
case MQTT_CONTROL_PUBREC:
case MQTT_CONTROL_PUBREL:
case MQTT_CONTROL_SUBSCRIBE:
case MQTT_CONTROL_UNSUBSCRIBE:
case MQTT_CONTROL_PINGREQ:
msg->state = MQTT_QUEUED_AWAITING_ACK;
break;
default:
client->error = MQTT_ERROR_MALFORMED_REQUEST;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_MALFORMED_REQUEST;
}
}
/* check for keep-alive */
{
mqtt_pal_time_t keep_alive_timeout = client->time_of_last_send + (mqtt_pal_time_t)((float)(client->keep_alive) * 0.75);
if (MQTT_PAL_TIME() > keep_alive_timeout) {
ssize_t rv = __mqtt_ping(client);
if (rv != MQTT_OK) {
client->error = rv;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return rv;
}
}
}
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
ssize_t __mqtt_recv(struct mqtt_client *client)
{
struct mqtt_response response;
MQTT_PAL_MUTEX_LOCK(&client->mutex);
/* read until there is nothing left to read */
while(1) {
/* read in as many bytes as possible */
ssize_t rv, consumed;
struct mqtt_queued_message *msg = NULL;
rv = mqtt_pal_recvall(client->socketfd, client->recv_buffer.curr, client->recv_buffer.curr_sz, 0);
if (rv < 0) {
/* an error occurred */
client->error = rv;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return rv;
} else {
client->recv_buffer.curr += rv;
client->recv_buffer.curr_sz -= rv;
}
/* attempt to parse */
consumed = mqtt_unpack_response(&response, client->recv_buffer.mem_start, client->recv_buffer.curr - client->recv_buffer.mem_start);
if (consumed < 0) {
client->error = consumed;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return consumed;
} else if (consumed == 0) {
/* if curr_sz is 0 then the buffer is too small to ever fit the message */
if (client->recv_buffer.curr_sz == 0) {
client->error = MQTT_ERROR_RECV_BUFFER_TOO_SMALL;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_RECV_BUFFER_TOO_SMALL;
}
/* just need to wait for the rest of the data */
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
/* response was unpacked successfully */
/*
The switch statement below manages how the client responds to messages from the broker.
Control Types (that we expect to receive from the broker):
MQTT_CONTROL_CONNACK:
-> release associated CONNECT
-> handle response
MQTT_CONTROL_PUBLISH:
-> stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2
-> call publish callback
MQTT_CONTROL_PUBACK:
-> release associated PUBLISH
MQTT_CONTROL_PUBREC:
-> release PUBLISH
-> stage PUBREL
MQTT_CONTROL_PUBREL:
-> release associated PUBREC
-> stage PUBCOMP
MQTT_CONTROL_PUBCOMP:
-> release PUBREL
MQTT_CONTROL_SUBACK:
-> release SUBSCRIBE
-> handle response
MQTT_CONTROL_UNSUBACK:
-> release UNSUBSCRIBE
MQTT_CONTROL_PINGRESP:
-> release PINGREQ
*/
switch (response.fixed_header.control_type) {
case MQTT_CONTROL_CONNACK:
/* release associated CONNECT */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_CONNECT, NULL);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* initialize typical response time */
client->typical_response_time = (double) (MQTT_PAL_TIME() - msg->time_sent);
/* check that connection was successful */
if (response.decoded.connack.return_code != MQTT_CONNACK_ACCEPTED) {
client->error = MQTT_ERROR_CONNECTION_REFUSED;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_CONNECTION_REFUSED;
}
break;
case MQTT_CONTROL_PUBLISH:
/* stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2 */
if (response.decoded.publish.qos_level == 1) {
rv = __mqtt_puback(client, response.decoded.publish.packet_id);
if (rv != MQTT_OK) {
client->error = rv;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return rv;
}
} else if (response.decoded.publish.qos_level == 2) {
/* check if this is a duplicate */
if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.publish.packet_id) != NULL) {
break;
}
rv = __mqtt_pubrec(client, response.decoded.publish.packet_id);
if (rv != MQTT_OK) {
client->error = rv;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return rv;
}
}
/* call publish callback */
client->publish_response_callback(&client->publish_response_callback_state, &response.decoded.publish);
break;
case MQTT_CONTROL_PUBACK:
/* release associated PUBLISH */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.puback.packet_id);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* update response time */
client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
break;
case MQTT_CONTROL_PUBREC:
/* check if this is a duplicate */
if (mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubrec.packet_id) != NULL) {
break;
}
/* release associated PUBLISH */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBLISH, &response.decoded.pubrec.packet_id);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* update response time */
client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
/* stage PUBREL */
rv = __mqtt_pubrel(client, response.decoded.pubrec.packet_id);
if (rv != MQTT_OK) {
client->error = rv;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return rv;
}
break;
case MQTT_CONTROL_PUBREL:
/* release associated PUBREC */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREC, &response.decoded.pubrel.packet_id);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* update response time */
client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
/* stage PUBCOMP */
rv = __mqtt_pubcomp(client, response.decoded.pubrec.packet_id);
if (rv != MQTT_OK) {
client->error = rv;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return rv;
}
break;
case MQTT_CONTROL_PUBCOMP:
/* release associated PUBREL */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PUBREL, &response.decoded.pubcomp.packet_id);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* update response time */
client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
break;
case MQTT_CONTROL_SUBACK:
/* release associated SUBSCRIBE */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_SUBSCRIBE, &response.decoded.suback.packet_id);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* update response time */
client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
/* check that subscription was successful (not currently only one subscribe at a time) */
if (response.decoded.suback.return_codes[0] == MQTT_SUBACK_FAILURE) {
client->error = MQTT_ERROR_SUBSCRIBE_FAILED;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_SUBSCRIBE_FAILED;
}
break;
case MQTT_CONTROL_UNSUBACK:
/* release associated UNSUBSCRIBE */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_UNSUBSCRIBE, &response.decoded.unsuback.packet_id);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* update response time */
client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
break;
case MQTT_CONTROL_PINGRESP:
/* release associated PINGREQ */
msg = mqtt_mq_find(&client->mq, MQTT_CONTROL_PINGREQ, NULL);
if (msg == NULL) {
client->error = MQTT_ERROR_ACK_OF_UNKNOWN;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_ACK_OF_UNKNOWN;
}
msg->state = MQTT_QUEUED_COMPLETE;
/* update response time */
client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent);
break;
default:
client->error = MQTT_ERROR_MALFORMED_RESPONSE;
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_ERROR_MALFORMED_RESPONSE;
}
{
/* we've handled the response, now clean the buffer */
void* dest = (unsigned char*)client->recv_buffer.mem_start;
void* src = (unsigned char*)client->recv_buffer.mem_start + consumed;
size_t n = client->recv_buffer.curr - client->recv_buffer.mem_start - consumed;
memmove(dest, src, n);
client->recv_buffer.curr -= consumed;
client->recv_buffer.curr_sz += consumed;
}
}
/* never hit (always return once there's nothing left. */
MQTT_PAL_MUTEX_UNLOCK(&client->mutex);
return MQTT_OK;
}
/* FIXED HEADER */
#define MQTT_BITFIELD_RULE_VIOLOATION(bitfield, rule_value, rule_mask) ((bitfield ^ rule_value) & rule_mask)
struct {
const uint8_t control_type_is_valid[16];
const uint8_t required_flags[16];
const uint8_t mask_required_flags[16];
} mqtt_fixed_header_rules = {
{ /* boolean value, true if type is valid */
0x00, /* MQTT_CONTROL_RESERVED */
0x01, /* MQTT_CONTROL_CONNECT */
0x01, /* MQTT_CONTROL_CONNACK */
0x01, /* MQTT_CONTROL_PUBLISH */
0x01, /* MQTT_CONTROL_PUBACK */
0x01, /* MQTT_CONTROL_PUBREC */
0x01, /* MQTT_CONTROL_PUBREL */
0x01, /* MQTT_CONTROL_PUBCOMP */
0x01, /* MQTT_CONTROL_SUBSCRIBE */
0x01, /* MQTT_CONTROL_SUBACK */
0x01, /* MQTT_CONTROL_UNSUBSCRIBE */
0x01, /* MQTT_CONTROL_UNSUBACK */
0x01, /* MQTT_CONTROL_PINGREQ */
0x01, /* MQTT_CONTROL_PINGRESP */
0x01, /* MQTT_CONTROL_DISCONNECT */
0x00 /* MQTT_CONTROL_RESERVED */
},
{ /* flags that must be set for the associated control type */
0x00, /* MQTT_CONTROL_RESERVED */
0x00, /* MQTT_CONTROL_CONNECT */
0x00, /* MQTT_CONTROL_CONNACK */
0x00, /* MQTT_CONTROL_PUBLISH */
0x00, /* MQTT_CONTROL_PUBACK */
0x00, /* MQTT_CONTROL_PUBREC */
0x02, /* MQTT_CONTROL_PUBREL */
0x00, /* MQTT_CONTROL_PUBCOMP */
0x02, /* MQTT_CONTROL_SUBSCRIBE */
0x00, /* MQTT_CONTROL_SUBACK */
0x02, /* MQTT_CONTROL_UNSUBSCRIBE */
0x00, /* MQTT_CONTROL_UNSUBACK */
0x00, /* MQTT_CONTROL_PINGREQ */
0x00, /* MQTT_CONTROL_PINGRESP */
0x00, /* MQTT_CONTROL_DISCONNECT */
0x00 /* MQTT_CONTROL_RESERVED */
},
{ /* mask of flags that must be specific values for the associated control type*/
0x00, /* MQTT_CONTROL_RESERVED */
0x0F, /* MQTT_CONTROL_CONNECT */
0x0F, /* MQTT_CONTROL_CONNACK */
0x00, /* MQTT_CONTROL_PUBLISH */
0x0F, /* MQTT_CONTROL_PUBACK */
0x0F, /* MQTT_CONTROL_PUBREC */
0x0F, /* MQTT_CONTROL_PUBREL */
0x0F, /* MQTT_CONTROL_PUBCOMP */
0x0F, /* MQTT_CONTROL_SUBSCRIBE */
0x0F, /* MQTT_CONTROL_SUBACK */
0x0F, /* MQTT_CONTROL_UNSUBSCRIBE */
0x0F, /* MQTT_CONTROL_UNSUBACK */
0x0F, /* MQTT_CONTROL_PINGREQ */
0x0F, /* MQTT_CONTROL_PINGRESP */
0x0F, /* MQTT_CONTROL_DISCONNECT */
0x00 /* MQTT_CONTROL_RESERVED */
}
};
ssize_t mqtt_fixed_header_rule_violation(const struct mqtt_fixed_header *fixed_header) {
uint8_t control_type;
uint8_t control_flags;
uint8_t required_flags;
uint8_t mask_required_flags;
/* get value and rules */
control_type = fixed_header->control_type;
control_flags = fixed_header->control_flags;
required_flags = mqtt_fixed_header_rules.required_flags[control_type];
mask_required_flags = mqtt_fixed_header_rules.mask_required_flags[control_type];
/* check for valid type */
if (!mqtt_fixed_header_rules.control_type_is_valid[control_type]) {
return MQTT_ERROR_CONTROL_FORBIDDEN_TYPE;
}
/* check that flags are appropriate */
if(MQTT_BITFIELD_RULE_VIOLOATION(control_flags, required_flags, mask_required_flags)) {
return MQTT_ERROR_CONTROL_INVALID_FLAGS;
}
return 0;
}
ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz) {
struct mqtt_fixed_header *fixed_header;
const uint8_t *start = buf;
int lshift;
ssize_t errcode;
/* check for null pointers or empty buffer */
if (response == NULL || buf == NULL) {
return MQTT_ERROR_NULLPTR;
}
fixed_header = &(response->fixed_header);
/* check that bufsz is not zero */
if (bufsz == 0) return 0;
/* parse control type and flags */
fixed_header->control_type = *buf >> 4;
fixed_header->control_flags = *buf & 0x0F;
/* parse remaining size */
fixed_header->remaining_length = 0;
lshift = 0;
do {
/* MQTT spec (2.2.3) says the maximum length is 28 bits */
if(lshift == 28)
return MQTT_ERROR_INVALID_REMAINING_LENGTH;
/* consume byte and assert at least 1 byte left */
--bufsz;
++buf;
if (bufsz == 0) return 0;
/* parse next byte*/
fixed_header->remaining_length += (*buf & 0x7F) << lshift;
lshift += 7;
} while(*buf & 0x80); /* while continue bit is set */
/* consume last byte */
--bufsz;
++buf;
/* check that the fixed header is valid */
errcode = mqtt_fixed_header_rule_violation(fixed_header);
if (errcode) {
return errcode;
}
/* check that the buffer size if GT remaining length */
if (bufsz < fixed_header->remaining_length) {
return 0;
}
/* return how many bytes were consumed */
return buf - start;
}
ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header) {
const uint8_t *start = buf;
ssize_t errcode;
uint32_t remaining_length;
/* check for null pointers or empty buffer */
if (fixed_header == NULL || buf == NULL) {
return MQTT_ERROR_NULLPTR;
}
/* check that the fixed header is valid */
errcode = mqtt_fixed_header_rule_violation(fixed_header);
if (errcode) {
return errcode;
}
/* check that bufsz is not zero */
if (bufsz == 0) return 0;
/* pack control type and flags */
*buf = (((uint8_t) fixed_header->control_type) << 4) & 0xF0;
*buf |= ((uint8_t) fixed_header->control_flags) & 0x0F;
remaining_length = fixed_header->remaining_length;
/* MQTT spec (2.2.3) says maximum remaining length is 2^28-1 */
if(remaining_length >= 256*1024*1024)
return MQTT_ERROR_INVALID_REMAINING_LENGTH;
do {
/* consume byte and assert at least 1 byte left */
--bufsz;
++buf;
if (bufsz == 0) return 0;
/* pack next byte */
*buf = remaining_length & 0x7F;
if(remaining_length > 127) *buf |= 0x80;
remaining_length = remaining_length >> 7;
} while(*buf & 0x80);
/* consume last byte */
--bufsz;
++buf;
/* check that there's still enough space in buffer for packet */
if (bufsz < fixed_header->remaining_length) {
return 0;
}
/* return how many bytes were consumed */
return buf - start;
}
/* CONNECT */
ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz,
const char* client_id,
const char* will_topic,
const void* will_message,
size_t will_message_size,
const char* user_name,
const char* password,
uint8_t connect_flags,
uint16_t keep_alive)
{
struct mqtt_fixed_header fixed_header;
size_t remaining_length;
const uint8_t *const start = buf;
ssize_t rv;
/* pack the fixed headr */
fixed_header.control_type = MQTT_CONTROL_CONNECT;
fixed_header.control_flags = 0x00;
/* calculate remaining length and build connect_flags at the same time */
connect_flags = connect_flags & ~MQTT_CONNECT_RESERVED;
remaining_length = 10; /* size of variable header */
if (client_id == NULL) {
/* client_id is a mandatory parameter */
return MQTT_ERROR_CONNECT_NULL_CLIENT_ID;
} else {
/* mqtt_string length is strlen + 2 */
remaining_length += __mqtt_packed_cstrlen(client_id);
}
if (will_topic != NULL) {
uint8_t temp;
/* there is a will */
connect_flags |= MQTT_CONNECT_WILL_FLAG;
remaining_length += __mqtt_packed_cstrlen(will_topic);
if (will_message == NULL) {
/* if there's a will there MUST be a will message */
return MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE;
}
remaining_length += 2 + will_message_size; /* size of will_message */
/* assert that the will QOS is valid (i.e. not 3) */
temp = connect_flags & 0x18; /* mask to QOS */
if (temp == 0x18) {
/* bitwise equality with QoS 3 (invalid)*/
return MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS;
}
} else {
/* there is no will so set all will flags to zero */
connect_flags &= ~MQTT_CONNECT_WILL_FLAG;
connect_flags &= ~0x18;
connect_flags &= ~MQTT_CONNECT_WILL_RETAIN;
}
if (user_name != NULL) {
/* a user name is present */
connect_flags |= MQTT_CONNECT_USER_NAME;
remaining_length += __mqtt_packed_cstrlen(user_name);
} else {
connect_flags &= ~MQTT_CONNECT_USER_NAME;
}
if (password != NULL) {
/* a password is present */
connect_flags |= MQTT_CONNECT_PASSWORD;
remaining_length += __mqtt_packed_cstrlen(password);
} else {
connect_flags &= ~MQTT_CONNECT_PASSWORD;
}
/* fixed header length is now calculated*/
fixed_header.remaining_length = remaining_length;
/* pack fixed header and perform error checks */
rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
if (rv <= 0) {
/* something went wrong */
return rv;
}
buf += rv;
bufsz -= rv;
/* check that the buffer has enough space to fit the remaining length */
if (bufsz < fixed_header.remaining_length) {
return 0;
}
/* pack the variable header */
*buf++ = 0x00;
*buf++ = 0x04;
*buf++ = (uint8_t) 'M';
*buf++ = (uint8_t) 'Q';
*buf++ = (uint8_t) 'T';
*buf++ = (uint8_t) 'T';
*buf++ = MQTT_PROTOCOL_LEVEL;
*buf++ = connect_flags;
buf += __mqtt_pack_uint16(buf, keep_alive);
/* pack the payload */
buf += __mqtt_pack_str(buf, client_id);
if (connect_flags & MQTT_CONNECT_WILL_FLAG) {
buf += __mqtt_pack_str(buf, will_topic);
buf += __mqtt_pack_uint16(buf, will_message_size);
memcpy(buf, will_message, will_message_size);
buf += will_message_size;
}
if (connect_flags & MQTT_CONNECT_USER_NAME) {
buf += __mqtt_pack_str(buf, user_name);
}
if (connect_flags & MQTT_CONNECT_PASSWORD) {
buf += __mqtt_pack_str(buf, password);
}
/* return the number of bytes that were consumed */
return buf - start;
}
/* CONNACK */
ssize_t mqtt_unpack_connack_response(struct mqtt_response *mqtt_response, const uint8_t *buf) {
const uint8_t *const start = buf;
struct mqtt_response_connack *response;
/* check that remaining length is 2 */
if (mqtt_response->fixed_header.remaining_length != 2) {
return MQTT_ERROR_MALFORMED_RESPONSE;
}
response = &(mqtt_response->decoded.connack);
/* unpack */
if (*buf & 0xFE) {
/* only bit 1 can be set */
return MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS;
} else {
response->session_present_flag = *buf++;
}
if (*buf > 5u) {
/* only bit 1 can be set */
return MQTT_ERROR_CONNACK_FORBIDDEN_CODE;
} else {
response->return_code = (enum MQTTConnackReturnCode) *buf++;
}
return buf - start;
}
/* DISCONNECT */
ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz) {
struct mqtt_fixed_header fixed_header;
fixed_header.control_type = MQTT_CONTROL_DISCONNECT;
fixed_header.control_flags = 0;
fixed_header.remaining_length = 0;
return mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
}
/* PING */
ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz) {
struct mqtt_fixed_header fixed_header;
fixed_header.control_type = MQTT_CONTROL_PINGREQ;
fixed_header.control_flags = 0;
fixed_header.remaining_length = 0;
return mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
}
/* PUBLISH */
ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz,
const char* topic_name,
uint16_t packet_id,
void* application_message,
size_t application_message_size,
uint8_t publish_flags)
{
const uint8_t *const start = buf;
ssize_t rv;
struct mqtt_fixed_header fixed_header;
uint16_t remaining_length;
uint8_t inspected_qos;
/* check for null pointers */
if(buf == NULL || topic_name == NULL) {
return MQTT_ERROR_NULLPTR;
}
/* inspect QoS level */
inspected_qos = (publish_flags & 0x06) >> 1; /* mask */
/* build the fixed header */
fixed_header.control_type = MQTT_CONTROL_PUBLISH;
/* calculate remaining length */
remaining_length = __mqtt_packed_cstrlen(topic_name);
if (inspected_qos > 0) {
remaining_length += 2;
}
remaining_length += application_message_size;
fixed_header.remaining_length = remaining_length;
/* force dup to 0 if qos is 0 */
if (inspected_qos == 0) {
publish_flags &= ~MQTT_PUBLISH_DUP;
}
/* make sure that qos is not 3 */
if (inspected_qos == 3) {
return MQTT_ERROR_PUBLISH_FORBIDDEN_QOS;
}
fixed_header.control_flags = publish_flags;
/* pack fixed header */
rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
if (rv <= 0) {
/* something went wrong */
return rv;
}
buf += rv;
bufsz -= rv;
/* check that buffer is big enough */
if (bufsz < remaining_length) {
return 0;
}
/* pack variable header */
buf += __mqtt_pack_str(buf, topic_name);
if (inspected_qos > 0) {
buf += __mqtt_pack_uint16(buf, packet_id);
}
/* pack payload */
memcpy(buf, application_message, application_message_size);
buf += application_message_size;
return buf - start;
}
ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
{
const uint8_t *const start = buf;
struct mqtt_fixed_header *fixed_header;
struct mqtt_response_publish *response;
fixed_header = &(mqtt_response->fixed_header);
response = &(mqtt_response->decoded.publish);
/* get flags */
response->dup_flag = (fixed_header->control_flags & MQTT_PUBLISH_DUP) >> 3;
response->qos_level = (fixed_header->control_flags & 0x06) >> 1;
response->retain_flag = fixed_header->control_flags & MQTT_PUBLISH_RETAIN;
/* make sure that remaining length is valid */
if (mqtt_response->fixed_header.remaining_length < 4) {
return MQTT_ERROR_MALFORMED_RESPONSE;
}
/* parse variable header */
response->topic_name_size = __mqtt_unpack_uint16(buf);
buf += 2;
response->topic_name = buf;
buf += response->topic_name_size;
if (response->qos_level > 0) {
response->packet_id = __mqtt_unpack_uint16(buf);
buf += 2;
}
/* get payload */
response->application_message = buf;
if (response->qos_level == 0) {
response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 2;
} else {
response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 4;
}
buf += response->application_message_size;
/* return number of bytes consumed */
return buf - start;
}
/* PUBXXX */
ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz,
enum MQTTControlPacketType control_type,
uint16_t packet_id)
{
const uint8_t *const start = buf;
struct mqtt_fixed_header fixed_header;
ssize_t rv;
if (buf == NULL) {
return MQTT_ERROR_NULLPTR;
}
/* pack fixed header */
fixed_header.control_type = control_type;
if (control_type == MQTT_CONTROL_PUBREL) {
fixed_header.control_flags = 0x02;
} else {
fixed_header.control_flags = 0;
}
fixed_header.remaining_length = 2;
rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
if (rv <= 0) {
return rv;
}
buf += rv;
bufsz -= rv;
if (bufsz < fixed_header.remaining_length) {
return 0;
}
buf += __mqtt_pack_uint16(buf, packet_id);
return buf - start;
}
ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
{
const uint8_t *const start = buf;
uint16_t packet_id;
/* assert remaining length is correct */
if (mqtt_response->fixed_header.remaining_length != 2) {
return MQTT_ERROR_MALFORMED_RESPONSE;
}
/* parse packet_id */
packet_id = __mqtt_unpack_uint16(buf);
buf += 2;
if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBACK) {
mqtt_response->decoded.puback.packet_id = packet_id;
} else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREC) {
mqtt_response->decoded.pubrec.packet_id = packet_id;
} else if (mqtt_response->fixed_header.control_type == MQTT_CONTROL_PUBREL) {
mqtt_response->decoded.pubrel.packet_id = packet_id;
} else {
mqtt_response->decoded.pubcomp.packet_id = packet_id;
}
return buf - start;
}
/* SUBACK */
ssize_t mqtt_unpack_suback_response (struct mqtt_response *mqtt_response, const uint8_t *buf) {
const uint8_t *const start = buf;
uint32_t remaining_length = mqtt_response->fixed_header.remaining_length;
/* assert remaining length is at least 3 (for packet id and at least 1 topic) */
if (remaining_length < 3) {
return MQTT_ERROR_MALFORMED_RESPONSE;
}
/* unpack packet_id */
mqtt_response->decoded.suback.packet_id = __mqtt_unpack_uint16(buf);
buf += 2;
remaining_length -= 2;
/* unpack return codes */
mqtt_response->decoded.suback.num_return_codes = (size_t) remaining_length;
mqtt_response->decoded.suback.return_codes = buf;
buf += remaining_length;
return buf - start;
}
/* SUBSCRIBE */
ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, unsigned int packet_id, ...) {
va_list args;
const uint8_t *const start = buf;
ssize_t rv;
struct mqtt_fixed_header fixed_header;
unsigned int num_subs = 0;
unsigned int i;
const char *topic[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
uint8_t max_qos[MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
/* parse all subscriptions */
va_start(args, packet_id);
while(1) {
topic[num_subs] = va_arg(args, const char*);
if (topic[num_subs] == NULL) {
/* end of list */
break;
}
max_qos[num_subs] = (uint8_t) va_arg(args, unsigned int);
++num_subs;
if (num_subs >= MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS) {
return MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS;
}
}
va_end(args);
/* build the fixed header */
fixed_header.control_type = MQTT_CONTROL_SUBSCRIBE;
fixed_header.control_flags = 2u;
fixed_header.remaining_length = 2u; /* size of variable header */
for(i = 0; i < num_subs; ++i) {
/* payload is topic name + max qos (1 byte) */
fixed_header.remaining_length += __mqtt_packed_cstrlen(topic[i]) + 1;
}
/* pack the fixed header */
rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
if (rv <= 0) {
return rv;
}
buf += rv;
bufsz -= rv;
/* check that the buffer has enough space */
if (bufsz < fixed_header.remaining_length) {
return 0;
}
/* pack variable header */
buf += __mqtt_pack_uint16(buf, packet_id);
/* pack payload */
for(i = 0; i < num_subs; ++i) {
buf += __mqtt_pack_str(buf, topic[i]);
*buf++ = max_qos[i];
}
return buf - start;
}
/* UNSUBACK */
ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf)
{
const uint8_t *const start = buf;
if (mqtt_response->fixed_header.remaining_length != 2) {
return MQTT_ERROR_MALFORMED_RESPONSE;
}
/* parse packet_id */
mqtt_response->decoded.unsuback.packet_id = __mqtt_unpack_uint16(buf);
buf += 2;
return buf - start;
}
/* UNSUBSCRIBE */
ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, unsigned int packet_id, ...) {
va_list args;
const uint8_t *const start = buf;
ssize_t rv;
struct mqtt_fixed_header fixed_header;
unsigned int num_subs = 0;
unsigned int i;
const char *topic[MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS];
/* parse all subscriptions */
va_start(args, packet_id);
while(1) {
topic[num_subs] = va_arg(args, const char*);
if (topic[num_subs] == NULL) {
/* end of list */
break;
}
++num_subs;
if (num_subs >= MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS) {
return MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS;
}
}
va_end(args);
/* build the fixed header */
fixed_header.control_type = MQTT_CONTROL_UNSUBSCRIBE;
fixed_header.control_flags = 2u;
fixed_header.remaining_length = 2u; /* size of variable header */
for(i = 0; i < num_subs; ++i) {
/* payload is topic name */
fixed_header.remaining_length += __mqtt_packed_cstrlen(topic[i]);
}
/* pack the fixed header */
rv = mqtt_pack_fixed_header(buf, bufsz, &fixed_header);
if (rv <= 0) {
return rv;
}
buf += rv;
bufsz -= rv;
/* check that the buffer has enough space */
if (bufsz < fixed_header.remaining_length) {
return 0;
}
/* pack variable header */
buf += __mqtt_pack_uint16(buf, packet_id);
/* pack payload */
for(i = 0; i < num_subs; ++i) {
buf += __mqtt_pack_str(buf, topic[i]);
}
return buf - start;
}
/* MESSAGE QUEUE */
void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz)
{
mq->mem_start = buf;
mq->mem_end = (unsigned char*)buf + bufsz;
mq->curr = buf;
mq->queue_tail = mq->mem_end;
mq->curr_sz = mqtt_mq_currsz(mq);
}
struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes)
{
/* make queued message header */
--(mq->queue_tail);
mq->queue_tail->start = mq->curr;
mq->queue_tail->size = nbytes;
mq->queue_tail->state = MQTT_QUEUED_UNSENT;
/* move curr and recalculate curr_sz */
mq->curr += nbytes;
mq->curr_sz = mqtt_mq_currsz(mq);
return mq->queue_tail;
}
void mqtt_mq_clean(struct mqtt_message_queue *mq) {
struct mqtt_queued_message *new_head;
for(new_head = mqtt_mq_get(mq, 0); new_head >= mq->queue_tail; --new_head) {
if (new_head->state != MQTT_QUEUED_COMPLETE) break;
}
/* check if everything can be removed */
if (new_head < mq->queue_tail) {
mq->curr = mq->mem_start;
mq->queue_tail = mq->mem_end;
mq->curr_sz = mqtt_mq_currsz(mq);
return;
} else if (new_head == mqtt_mq_get(mq, 0)) {
/* do nothing */
return;
}
/* move buffered data */
{
size_t n = mq->curr - new_head->start;
size_t removing = new_head->start - (uint8_t*) mq->mem_start;
memmove(mq->mem_start, new_head->start, n);
mq->curr = (unsigned char*)mq->mem_start + n;
/* move queue */
{
ssize_t new_tail_idx = new_head - mq->queue_tail;
memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (new_tail_idx + 1));
mq->queue_tail = mqtt_mq_get(mq, new_tail_idx);
{
/* bump back start's */
ssize_t i = 0;
for(; i < new_tail_idx + 1; ++i) {
mqtt_mq_get(mq, i)->start -= removing;
}
}
}
}
/* get curr_sz */
mq->curr_sz = mqtt_mq_currsz(mq);
}
struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, uint16_t *packet_id)
{
struct mqtt_queued_message *curr;
for(curr = mqtt_mq_get(mq, 0); curr >= mq->queue_tail; --curr) {
if (curr->control_type == control_type) {
if ((packet_id == NULL && curr->state != MQTT_QUEUED_COMPLETE) ||
(packet_id != NULL && *packet_id == curr->packet_id)) {
return curr;
}
}
}
return NULL;
}
/* RESPONSE UNPACKING */
ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz) {
const uint8_t *const start = buf;
ssize_t rv = mqtt_unpack_fixed_header(response, buf, bufsz);
if (rv <= 0) return rv;
else buf += rv;
switch(response->fixed_header.control_type) {
case MQTT_CONTROL_CONNACK:
rv = mqtt_unpack_connack_response(response, buf);
break;
case MQTT_CONTROL_PUBLISH:
rv = mqtt_unpack_publish_response(response, buf);
break;
case MQTT_CONTROL_PUBACK:
rv = mqtt_unpack_pubxxx_response(response, buf);
break;
case MQTT_CONTROL_PUBREC:
rv = mqtt_unpack_pubxxx_response(response, buf);
break;
case MQTT_CONTROL_PUBREL:
rv = mqtt_unpack_pubxxx_response(response, buf);
break;
case MQTT_CONTROL_PUBCOMP:
rv = mqtt_unpack_pubxxx_response(response, buf);
break;
case MQTT_CONTROL_SUBACK:
rv = mqtt_unpack_suback_response(response, buf);
break;
case MQTT_CONTROL_UNSUBACK:
rv = mqtt_unpack_unsuback_response(response, buf);
break;
case MQTT_CONTROL_PINGRESP:
return rv;
default:
return MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE;
}
if (rv < 0) return rv;
buf += rv;
return buf - start;
}
/* EXTRA DETAILS */
ssize_t __mqtt_pack_uint16(uint8_t *buf, uint16_t integer)
{
uint16_t integer_htons = MQTT_PAL_HTONS(integer);
memcpy(buf, &integer_htons, 2);
return 2;
}
uint16_t __mqtt_unpack_uint16(const uint8_t *buf)
{
uint16_t integer_htons;
memcpy(&integer_htons, buf, 2);
return MQTT_PAL_NTOHS(integer_htons);
}
ssize_t __mqtt_pack_str(uint8_t *buf, const char* str) {
uint16_t length = strlen(str);
int i = 0;
/* pack string length */
buf += __mqtt_pack_uint16(buf, length);
/* pack string */
for(; i < length; ++i) {
*(buf++) = str[i];
}
/* return number of bytes consumed */
return length + 2;
}
static const char *MQTT_ERRORS_STR[] = {
"MQTT_UNKNOWN_ERROR",
__ALL_MQTT_ERRORS(GENERATE_STRING)
};
const char* mqtt_error_str(enum MQTTErrors error) {
int offset = error - MQTT_ERROR_UNKNOWN;
if (offset >= 0) {
return MQTT_ERRORS_STR[offset];
} else if (error == 0) {
return "MQTT_ERROR: Buffer too small.";
} else if (error > 0) {
return "MQTT_OK";
} else {
return MQTT_ERRORS_STR[0];
}
}
/** @endcond*/
#ifndef __MQTT_H__
#define __MQTT_H__
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "mqtt_pal.h"
/**
* @file
* @brief Declares all the MQTT-C functions and datastructures.
*
* @note You should <code>\#include <mqtt.h></code>.
*
* @example simple_publisher.c
* A simple program to that publishes the current time whenever ENTER is pressed.
*
* Usage:
* \code{.sh}
* ./bin/simple_publisher [address [port [topic]]]
* \endcode
*
* Where \c address is the address of the MQTT broker, \c port is the port number the
* MQTT broker is running on, and \c topic is the name of the topic to publish with. Note
* that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org",
* \c port = \c "1883", and \c topic = "datetime".
*
* @example simple_subscriber.c
* A simple program that subscribes to a single topic and prints all updates that are received.
*
* Usage:
* \code{.sh}
* ./bin/simple_subscriber [address [port [topic]]]
* \endcode
*
* Where \c address is the address of the MQTT broker, \c port is the port number the
* MQTT broker is running on, and \c topic is the name of the topic subscribe to. Note
* that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org",
* \c port = \c "1883", and \c topic = "datetime".
*
* @example reconnect_subscriber.c
* Same program as \ref simple_subscriber.c, but using the automatic reconnect functionality.
*
* @example bio_publisher.c
* Same program as \ref simple_publisher.c, but uses a unencrypted BIO socket.
*
* @example openssl_publisher.c
* Same program as \ref simple_publisher.c, but over an encrypted connection using OpenSSL.
*
* Usage:
* \code{.sh}
* ./bin/openssl_publisher ca_file [address [port [topic]]]
* \endcode
*
*
* @defgroup api API
* @brief Documentation of everything you need to know to use the MQTT-C client.
*
* This module contains everything you need to know to use MQTT-C in your application.
* For usage examples see:
* - @ref simple_publisher.c
* - @ref simple_subscriber.c
* - @ref reconnect_subscriber.c
* - @ref bio_publisher.c
* - @ref openssl_publisher.c
*
* @note MQTT-C can be used in both single-threaded and multi-threaded applications. All
* the functions in \ref api are thread-safe.
*
* @defgroup packers Control Packet Serialization
* @brief Developer documentation of the functions and datastructures used for serializing MQTT
* control packets.
*
* @defgroup unpackers Control Packet Deserialization
* @brief Developer documentation of the functions and datastructures used for deserializing MQTT
* control packets.
*
* @defgroup details Utilities
* @brief Developer documentation for the utilities used to implement the MQTT-C client.
*
* @note To deserialize a packet from a buffer use \ref mqtt_unpack_response (it's the only
* function you need).
*/
/**
* @brief An enumeration of the MQTT control packet types.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021">
* MQTT v3.1.1: MQTT Control Packet Types
* </a>
*/
enum MQTTControlPacketType {
MQTT_CONTROL_CONNECT=1u,
MQTT_CONTROL_CONNACK=2u,
MQTT_CONTROL_PUBLISH=3u,
MQTT_CONTROL_PUBACK=4u,
MQTT_CONTROL_PUBREC=5u,
MQTT_CONTROL_PUBREL=6u,
MQTT_CONTROL_PUBCOMP=7u,
MQTT_CONTROL_SUBSCRIBE=8u,
MQTT_CONTROL_SUBACK=9u,
MQTT_CONTROL_UNSUBSCRIBE=10u,
MQTT_CONTROL_UNSUBACK=11u,
MQTT_CONTROL_PINGREQ=12u,
MQTT_CONTROL_PINGRESP=13u,
MQTT_CONTROL_DISCONNECT=14u
};
/**
* @brief The fixed header of an MQTT control packet.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020">
* MQTT v3.1.1: Fixed Header
* </a>
*/
struct mqtt_fixed_header {
/** The type of packet. */
enum MQTTControlPacketType control_type;
/** The packets control flags.*/
uint32_t control_flags: 4;
/** The remaining size of the packet in bytes (i.e. the size of variable header and payload).*/
uint32_t remaining_length;
};
/**
* @brief The protocol identifier for MQTT v3.1.1.
* @ingroup packers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030">
* MQTT v3.1.1: CONNECT Variable Header.
* </a>
*/
#define MQTT_PROTOCOL_LEVEL 0x04
/**
* @brief A macro used to declare the enum MQTTErrors and associated
* error messages (the members of the num) at the same time.
*/
#define __ALL_MQTT_ERRORS(MQTT_ERROR) \
MQTT_ERROR(MQTT_ERROR_NULLPTR) \
MQTT_ERROR(MQTT_ERROR_CONTROL_FORBIDDEN_TYPE) \
MQTT_ERROR(MQTT_ERROR_CONTROL_INVALID_FLAGS) \
MQTT_ERROR(MQTT_ERROR_CONTROL_WRONG_TYPE) \
MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_CLIENT_ID) \
MQTT_ERROR(MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE) \
MQTT_ERROR(MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS) \
MQTT_ERROR(MQTT_ERROR_CONNACK_FORBIDDEN_FLAGS) \
MQTT_ERROR(MQTT_ERROR_CONNACK_FORBIDDEN_CODE) \
MQTT_ERROR(MQTT_ERROR_PUBLISH_FORBIDDEN_QOS) \
MQTT_ERROR(MQTT_ERROR_SUBSCRIBE_TOO_MANY_TOPICS) \
MQTT_ERROR(MQTT_ERROR_MALFORMED_RESPONSE) \
MQTT_ERROR(MQTT_ERROR_UNSUBSCRIBE_TOO_MANY_TOPICS) \
MQTT_ERROR(MQTT_ERROR_RESPONSE_INVALID_CONTROL_TYPE) \
MQTT_ERROR(MQTT_ERROR_CONNECT_NOT_CALLED) \
MQTT_ERROR(MQTT_ERROR_SEND_BUFFER_IS_FULL) \
MQTT_ERROR(MQTT_ERROR_SOCKET_ERROR) \
MQTT_ERROR(MQTT_ERROR_MALFORMED_REQUEST) \
MQTT_ERROR(MQTT_ERROR_RECV_BUFFER_TOO_SMALL) \
MQTT_ERROR(MQTT_ERROR_ACK_OF_UNKNOWN) \
MQTT_ERROR(MQTT_ERROR_NOT_IMPLEMENTED) \
MQTT_ERROR(MQTT_ERROR_CONNECTION_REFUSED) \
MQTT_ERROR(MQTT_ERROR_SUBSCRIBE_FAILED) \
MQTT_ERROR(MQTT_ERROR_CONNECTION_CLOSED) \
MQTT_ERROR(MQTT_ERROR_INITIAL_RECONNECT) \
MQTT_ERROR(MQTT_ERROR_INVALID_REMAINING_LENGTH)
/* todo: add more connection refused errors */
/**
* @brief A macro used to generate the enum MQTTErrors from
* \ref __ALL_MQTT_ERRORS
* @see __ALL_MQTT_ERRORS
*/
#define GENERATE_ENUM(ENUM) ENUM,
/**
* @brief A macro used to generate the error messages associated with
* MQTTErrors from \ref __ALL_MQTT_ERRORS
* @see __ALL_MQTT_ERRORS
*/
#define GENERATE_STRING(STRING) #STRING,
/**
* @brief An enumeration of error codes. Error messages can be retrieved by calling \ref mqtt_error_str.
* @ingroup api
*
* @see mqtt_error_str
*/
enum MQTTErrors {
MQTT_ERROR_UNKNOWN=INT_MIN,
__ALL_MQTT_ERRORS(GENERATE_ENUM)
MQTT_OK = 1
};
/**
* @brief Returns an error message for error code, \p error.
* @ingroup api
*
* @param[in] error the error code.
*
* @returns The associated error message.
*/
const char* mqtt_error_str(enum MQTTErrors error);
/**
* @brief Pack a MQTT 16 bit integer, given a native 16 bit integer .
*
* @param[out] buf the buffer that the MQTT integer will be written to.
* @param[in] integer the native integer to be written to \p buf.
*
* @warning This function provides no error checking.
*
* @returns 2
*/
ssize_t __mqtt_pack_uint16(uint8_t *buf, uint16_t integer);
/**
* @brief Unpack a MQTT 16 bit integer to a native 16 bit integer.
*
* @param[in] buf the buffer that the MQTT integer will be read from.
*
* @warning This function provides no error checking and does not modify \p buf.
*
* @returns The native integer
*/
uint16_t __mqtt_unpack_uint16(const uint8_t *buf);
/**
* @brief Pack a MQTT string, given a c-string \p str.
*
* @param[out] buf the buffer that the MQTT string will be written to.
* @param[in] str the c-string to be written to \p buf.
*
* @warning This function provides no error checking.
*
* @returns strlen(str) + 2
*/
ssize_t __mqtt_pack_str(uint8_t *buf, const char* str);
/** @brief A macro to get the MQTT string length from a c-string. */
#define __mqtt_packed_cstrlen(x) (2 + strlen(x))
/* RESPONSES */
/**
* @brief An enumeration of the return codes returned in a CONNACK packet.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.1_-">
* MQTT v3.1.1: CONNACK return codes.
* </a>
*/
enum MQTTConnackReturnCode {
MQTT_CONNACK_ACCEPTED = 0u,
MQTT_CONNACK_REFUSED_PROTOCOL_VERSION = 1u,
MQTT_CONNACK_REFUSED_IDENTIFIER_REJECTED = 2u,
MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3u,
MQTT_CONNACK_REFUSED_BAD_USER_NAME_OR_PASSWORD = 4u,
MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5u
};
/**
* @brief A connection response datastructure.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033">
* MQTT v3.1.1: CONNACK - Acknowledgement connection response.
* </a>
*/
struct mqtt_response_connack {
/**
* @brief Allows client and broker to check if they have a consistent view about whether there is
* already a stored session state.
*/
uint8_t session_present_flag;
/**
* @brief The return code of the connection request.
*
* @see MQTTConnackReturnCode
*/
enum MQTTConnackReturnCode return_code;
};
/**
* @brief A publish packet received from the broker.
* @ingroup unpackers
*
* A publish packet is received from the broker when a client publishes to a topic that the
* \em {local client} is subscribed to.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037">
* MQTT v3.1.1: PUBLISH - Publish Message.
* </a>
*/
struct mqtt_response_publish {
/**
* @brief The DUP flag. DUP flag is 0 if its the first attempt to send this publish packet. A DUP flag
* of 1 means that this might be a re-delivery of the packet.
*/
uint8_t dup_flag;
/**
* @brief The quality of service level.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_3.11_-">
* MQTT v3.1.1: QoS Definitions
* </a>
*/
uint8_t qos_level;
/** @brief The retain flag of this publish message. */
uint8_t retain_flag;
/** @brief Size of the topic name (number of characters). */
uint16_t topic_name_size;
/**
* @brief The topic name.
* @note topic_name is not null terminated. Therefore topic_name_size must be used to get the
* string length.
*/
const void* topic_name;
/** @brief The publish message's packet ID. */
uint16_t packet_id;
/** @brief The publish message's application message.*/
const void* application_message;
/** @brief The size of the application message in bytes. */
size_t application_message_size;
};
/**
* @brief A publish acknowledgement for messages that were published with QoS level 1.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043">
* MQTT v3.1.1: PUBACK - Publish Acknowledgement.
* </a>
*
*/
struct mqtt_response_puback {
/** @brief The published messages packet ID. */
uint16_t packet_id;
};
/**
* @brief The response packet to a PUBLISH packet with QoS level 2.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048">
* MQTT v3.1.1: PUBREC - Publish Received.
* </a>
*
*/
struct mqtt_response_pubrec {
/** @brief The published messages packet ID. */
uint16_t packet_id;
};
/**
* @brief The response to a PUBREC packet.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053">
* MQTT v3.1.1: PUBREL - Publish Release.
* </a>
*
*/
struct mqtt_response_pubrel {
/** @brief The published messages packet ID. */
uint16_t packet_id;
};
/**
* @brief The response to a PUBREL packet.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058">
* MQTT v3.1.1: PUBCOMP - Publish Complete.
* </a>
*
*/
struct mqtt_response_pubcomp {
/** T@brief he published messages packet ID. */
uint16_t packet_id;
};
/**
* @brief An enumeration of subscription acknowledgement return codes.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Figure_3.26_-">
* MQTT v3.1.1: SUBACK Return Codes.
* </a>
*/
enum MQTTSubackReturnCodes {
MQTT_SUBACK_SUCCESS_MAX_QOS_0 = 0u,
MQTT_SUBACK_SUCCESS_MAX_QOS_1 = 1u,
MQTT_SUBACK_SUCCESS_MAX_QOS_2 = 2u,
MQTT_SUBACK_FAILURE = 128u
};
/**
* @brief The response to a subscription request.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718068">
* MQTT v3.1.1: SUBACK - Subscription Acknowledgement.
* </a>
*/
struct mqtt_response_suback {
/** @brief The published messages packet ID. */
uint16_t packet_id;
/**
* Array of return codes corresponding to the requested subscribe topics.
*
* @see MQTTSubackReturnCodes
*/
const uint8_t *return_codes;
/** The number of return codes. */
size_t num_return_codes;
};
/**
* @brief The brokers response to a UNSUBSCRIBE request.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718077">
* MQTT v3.1.1: UNSUBACK - Unsubscribe Acknowledgement.
* </a>
*/
struct mqtt_response_unsuback {
/** @brief The published messages packet ID. */
uint16_t packet_id;
};
/**
* @brief The response to a ping request.
* @ingroup unpackers
*
* @note This response contains no members.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718086">
* MQTT v3.1.1: PINGRESP - Ping Response.
* </a>
*/
struct mqtt_response_pingresp {
int dummy;
};
/**
* @brief A struct used to deserialize/interpret an incoming packet from the broker.
* @ingroup unpackers
*/
struct mqtt_response {
/** @brief The mqtt_fixed_header of the deserialized packet. */
struct mqtt_fixed_header fixed_header;
/**
* @brief A union of the possible responses from the broker.
*
* @note The fixed_header contains the control type. This control type corresponds to the
* member of this union that should be accessed. For example if
* fixed_header#control_type == \c MQTT_CONTROL_PUBLISH then
* decoded#publish should be accessed.
*/
union {
struct mqtt_response_connack connack;
struct mqtt_response_publish publish;
struct mqtt_response_puback puback;
struct mqtt_response_pubrec pubrec;
struct mqtt_response_pubrel pubrel;
struct mqtt_response_pubcomp pubcomp;
struct mqtt_response_suback suback;
struct mqtt_response_unsuback unsuback;
struct mqtt_response_pingresp pingresp;
} decoded;
};
/**
* @brief Deserialize the contents of \p buf into an mqtt_fixed_header object.
* @ingroup unpackers
*
* @note This function performs complete error checking and a positive return value
* means the entire mqtt_response can be deserialized from \p buf.
*
* @param[out] response the response who's \ref mqtt_response.fixed_header will be initialized.
* @param[in] buf the buffer.
* @param[in] bufsz the total number of bytes in the buffer.
*
* @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
* bytes to parse the packet, or a negative value if there was a protocol violation.
*/
ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz);
/**
* @brief Deserialize a CONNACK response from \p buf.
* @ingroup unpackers
*
* @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the control packet type
* must be \c MQTT_CONTROL_CONNACK.
*
* @param[out] mqtt_response the mqtt_response that will be initialized.
* @param[in] buf the buffer that contains the variable header and payload of the packet. The
* first byte of \p buf should be the first byte of the variable header.
*
* @relates mqtt_response_connack
*
* @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
* bytes to parse the packet, or a negative value if there was a protocol violation.
*/
ssize_t mqtt_unpack_connack_response (struct mqtt_response *mqtt_response, const uint8_t *buf);
/**
* @brief Deserialize a publish response from \p buf.
* @ingroup unpackers
*
* @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
* have a control type of \c MQTT_CONTROL_PUBLISH.
*
* @param[out] mqtt_response the response that is initialized from the contents of \p buf.
* @param[in] buf the buffer with the incoming data.
*
* @relates mqtt_response_publish
*
* @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
* bytes to parse the packet, or a negative value if there was a protocol violation.
*/
ssize_t mqtt_unpack_publish_response (struct mqtt_response *mqtt_response, const uint8_t *buf);
/**
* @brief Deserialize a PUBACK/PUBREC/PUBREL/PUBCOMP packet from \p buf.
* @ingroup unpackers
*
* @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
* have a control type of \c MQTT_CONTROL_PUBACK, \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL
* or \c MQTT_CONTROL_PUBCOMP.
*
* @param[out] mqtt_response the response that is initialized from the contents of \p buf.
* @param[in] buf the buffer with the incoming data.
*
* @relates mqtt_response_puback mqtt_response_pubrec mqtt_response_pubrel mqtt_response_pubcomp
*
* @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
* bytes to parse the packet, or a negative value if there was a protocol violation.
*/
ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf);
/**
* @brief Deserialize a SUBACK packet from \p buf.
* @ingroup unpacker
*
* @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
* have a control type of \c MQTT_CONTROL_SUBACK.
*
* @param[out] mqtt_response the response that is initialized from the contents of \p buf.
* @param[in] buf the buffer with the incoming data.
*
* @relates mqtt_response_suback
*
* @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
* bytes to parse the packet, or a negative value if there was a protocol violation.
*/
ssize_t mqtt_unpack_suback_response(struct mqtt_response *mqtt_response, const uint8_t *buf);
/**
* @brief Deserialize an UNSUBACK packet from \p buf.
* @ingroup unpacker
*
* @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must
* have a control type of \c MQTT_CONTROL_UNSUBACK.
*
* @param[out] mqtt_response the response that is initialized from the contents of \p buf.
* @param[in] buf the buffer with the incoming data.
*
* @relates mqtt_response_unsuback
*
* @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough
* bytes to parse the packet, or a negative value if there was a protocol violation.
*/
ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf);
/**
* @brief Deserialize a packet from the broker.
* @ingroup unpackers
*
* @param[out] response the mqtt_response that will be initialize from \p buf.
* @param[in] buf the incoming data buffer.
* @param[in] bufsz the number of bytes available in the buffer.
*
* @relates mqtt_response
*
* @returns The number of bytes consumed on success, zero \p buf does not contain enough bytes
* to deserialize the packet, a negative value if a protocol violation was encountered.
*/
ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz);
/* REQUESTS */
/**
* @brief Serialize an mqtt_fixed_header and write it to \p buf.
* @ingroup packers
*
* @note This function performs complete error checking and a positive return value
* guarantees the entire packet will fit into the given buffer.
*
* @param[out] buf the buffer to write to.
* @param[in] bufsz the maximum number of bytes that can be put in to \p buf.
* @param[in] fixed_header the fixed header that will be serialized.
*
* @returns The number of bytes written to \p buf, or 0 if \p buf is too small, or a
* negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header);
/**
* @brief An enumeration of CONNECT packet flags.
* @ingroup packers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030">
* MQTT v3.1.1: CONNECT Variable Header.
* </a>
*/
enum MQTTConnectFlags {
MQTT_CONNECT_RESERVED = 1u,
MQTT_CONNECT_CLEAN_SESSION = 2u,
MQTT_CONNECT_WILL_FLAG = 4u,
MQTT_CONNECT_WILL_QOS_0 = (0u & 0x03) << 3,
MQTT_CONNECT_WILL_QOS_1 = (1u & 0x03) << 3,
MQTT_CONNECT_WILL_QOS_2 = (2u & 0x03) << 3,
MQTT_CONNECT_WILL_RETAIN = 32u,
MQTT_CONNECT_PASSWORD = 64u,
MQTT_CONNECT_USER_NAME = 128u
};
/**
* @brief Serialize a connection request into a buffer.
* @ingroup packers
*
* @param[out] buf the buffer to pack the connection request packet into.
* @param[in] bufsz the number of bytes left in \p buf.
* @param[in] client_id the ID that identifies the local client. \p client_id is a required
* parameter.
* @param[in] will_topic the topic under which the local client's will message will be published.
* Set to \c NULL for no will message. If \p will_topic is not \c NULL a
* \p will_message must also be provided.
* @param[in] will_message the will message to be published upon a unsuccessful disconnection of
* the local client. Set to \c NULL if \p will_topic is \c NULL.
* \p will_message must \em not be \c NULL if \p will_topic is not
* \c NULL.
* @param[in] will_message_size The size of \p will_message in bytes.
* @param[in] user_name the username to be used to connect to the broker with. Set to \c NULL if
* no username is required.
* @param[in] password the password to be used to connect to the broker with. Set to \c NULL if
* no password is required.
* @param[in] connect_flags additional MQTTConnectFlags to be set. The only flags that need to be
* set manually are \c MQTT_CONNECT_CLEAN_SESSION,
* \c MQTT_CONNECT_WILL_QOS_X (for \c X &isin; {0, 1, 2}), and
* \c MQTT_CONNECT_WILL_RETAIN. Set to 0 if no additional flags are
* required.
* @param[in] keep_alive the keep alive time in seconds. It is the responsibility of the clinet
* to ensure packets are sent to the server \em {at least} this frequently.
*
* @note If there is a \p will_topic and no additional \p connect_flags are given, then by
* default \p will_message will be published at QoS level 0.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028">
* MQTT v3.1.1: CONNECT - Client Requests a Connection to a Server.
* </a>
*
* @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the CONNECT
* packet, a negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz,
const char* client_id,
const char* will_topic,
const void* will_message,
size_t will_message_size,
const char* user_name,
const char* password,
uint8_t connect_flags,
uint16_t keep_alive);
/**
* @brief An enumeration of the PUBLISH flags.
* @ingroup packers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037">
* MQTT v3.1.1: PUBLISH - Publish Message.
* </a>
*/
enum MQTTPublishFlags {
MQTT_PUBLISH_DUP = 8u,
MQTT_PUBLISH_QOS_0 = ((0u << 1) & 0x06),
MQTT_PUBLISH_QOS_1 = ((1u << 1) & 0x06),
MQTT_PUBLISH_QOS_2 = ((2u << 1) & 0x06),
MQTT_PUBLISH_QOS_MASK = ((3u << 1) & 0x06),
MQTT_PUBLISH_RETAIN = 0x01
};
/**
* @brief Serialize a PUBLISH request and put it in \p buf.
* @ingroup packers
*
* @param[out] buf the buffer to put the PUBLISH packet in.
* @param[in] bufsz the maximum number of bytes that can be put into \p buf.
* @param[in] topic_name the topic to publish \p application_message under.
* @param[in] packet_id this packets packet ID.
* @param[in] application_message the application message to be published.
* @param[in] application_message_size the size of \p application_message in bytes.
* @param[in] publish_flags The flags to publish \p application_message with. These include
* the \c MQTT_PUBLISH_DUP flag, \c MQTT_PUBLISH_QOS_X (\c X &isin;
* {0, 1, 2}), and \c MQTT_PUBLISH_RETAIN flag.
*
* @note The default QoS is level 0.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037">
* MQTT v3.1.1: PUBLISH - Publish Message.
* </a>
*
* @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBLISH
* packet, a negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz,
const char* topic_name,
uint16_t packet_id,
void* application_message,
size_t application_message_size,
uint8_t publish_flags);
/**
* @brief Serialize a PUBACK, PUBREC, PUBREL, or PUBCOMP packet and put it in \p buf.
* @ingroup packers
*
* @param[out] buf the buffer to put the PUBXXX packet in.
* @param[in] bufsz the maximum number of bytes that can be put into \p buf.
* @param[in] control_type the type of packet. Must be one of: \c MQTT_CONTROL_PUBACK,
* \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL,
* or \c MQTT_CONTROL_PUBCOMP.
* @param[in] packet_id the packet ID of the packet being acknowledged.
*
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718043">
* MQTT v3.1.1: PUBACK - Publish Acknowledgement.
* </a>
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718048">
* MQTT v3.1.1: PUBREC - Publish Received.
* </a>
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718053">
* MQTT v3.1.1: PUBREL - Publish Released.
* </a>
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718058">
* MQTT v3.1.1: PUBCOMP - Publish Complete.
* </a>
*
* @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBXXX
* packet, a negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz,
enum MQTTControlPacketType control_type,
uint16_t packet_id);
/**
* @brief The maximum number topics that can be subscribed to in a single call to
* mqtt_pack_subscribe_request.
* @ingroup packers
*
* @see mqtt_pack_subscribe_request
*/
#define MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8
/**
* @brief Serialize a SUBSCRIBE packet and put it in \p buf.
* @ingroup packers
*
* @param[out] buf the buffer to put the SUBSCRIBE packet in.
* @param[in] bufsz the maximum number of bytes that can be put into \p buf.
* @param[in] packet_id the packet ID to be used.
* @param[in] ... \c NULL terminated list of (\c {const char *topic_name}, \c {int max_qos_level})
* pairs.
*
* @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example:
* @code
* ssize_t n = mqtt_pack_subscribe_request(buf, bufsz, 1234, "topic_1", 0, "topic_2", 2, NULL);
* @endcode
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718063">
* MQTT v3.1.1: SUBSCRIBE - Subscribe to Topics.
* </a>
*
* @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the SUBSCRIBE
* packet, a negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz,
unsigned int packet_id,
...); /* null terminated */
/**
* @brief The maximum number topics that can be subscribed to in a single call to
* mqtt_pack_unsubscribe_request.
* @ingroup packers
*
* @see mqtt_pack_unsubscribe_request
*/
#define MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8
/**
* @brief Serialize a UNSUBSCRIBE packet and put it in \p buf.
* @ingroup packers
*
* @param[out] buf the buffer to put the UNSUBSCRIBE packet in.
* @param[in] bufsz the maximum number of bytes that can be put into \p buf.
* @param[in] packet_id the packet ID to be used.
* @param[in] ... \c NULL terminated list of \c {const char *topic_name}'s to unsubscribe from.
*
* @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example:
* @code
* ssize_t n = mqtt_pack_unsubscribe_request(buf, bufsz, 4321, "topic_1", "topic_2", NULL);
* @endcode
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718072">
* MQTT v3.1.1: UNSUBSCRIBE - Unsubscribe from Topics.
* </a>
*
* @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the UNSUBSCRIBE
* packet, a negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz,
unsigned int packet_id,
...); /* null terminated */
/**
* @brief Serialize a PINGREQ and put it into \p buf.
* @ingroup packers
*
* @param[out] buf the buffer to put the PINGREQ packet in.
* @param[in] bufsz the maximum number of bytes that can be put into \p buf.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718081">
* MQTT v3.1.1: PINGREQ - Ping Request.
* </a>
*
* @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PINGREQ
* packet, a negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz);
/**
* @brief Serialize a DISCONNECT and put it into \p buf.
* @ingroup packers
*
* @param[out] buf the buffer to put the DISCONNECT packet in.
* @param[in] bufsz the maximum number of bytes that can be put into \p buf.
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090">
* MQTT v3.1.1: DISCONNECT - Disconnect Notification.
* </a>
*
* @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the DISCONNECT
* packet, a negative value if there was a protocol violation.
*/
ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz);
/**
* @brief An enumeration of queued message states.
* @ingroup details
*/
enum MQTTQueuedMessageState {
MQTT_QUEUED_UNSENT,
MQTT_QUEUED_AWAITING_ACK,
MQTT_QUEUED_COMPLETE
};
/**
* @brief A message in a mqtt_message_queue.
* @ingroup details
*/
struct mqtt_queued_message {
/** @brief A pointer to the start of the message. */
uint8_t *start;
/** @brief The number of bytes in the message. */
size_t size;
/** @brief The state of the message. */
enum MQTTQueuedMessageState state;
/**
* @brief The time at which the message was sent..
*
* @note A timeout will only occur if the message is in
* the MQTT_QUEUED_AWAITING_ACK \c state.
*/
mqtt_pal_time_t time_sent;
/**
* @brief The control type of the message.
*/
enum MQTTControlPacketType control_type;
/**
* @brief The packet id of the message.
*
* @note This field is only used if the associate \c control_type has a
* \c packet_id field.
*/
uint16_t packet_id;
};
/**
* @brief A message queue.
* @ingroup details
*
* @note This struct is used internally to manage sending messages.
* @note The only members the user should use are \c curr and \c curr_sz.
*/
struct mqtt_message_queue {
/**
* @brief The start of the message queue's memory block.
*
* @warning This member should \em not be manually changed.
*/
void *mem_start;
/** @brief The end of the message queue's memory block. */
void *mem_end;
/**
* @brief A pointer to the position in the buffer you can pack bytes at.
*
* @note Immediately after packing bytes at \c curr you \em must call
* mqtt_mq_register.
*/
uint8_t *curr;
/**
* @brief The number of bytes that can be written to \c curr.
*
* @note curr_sz will decrease by more than the number of bytes you write to
* \c curr. This is because the mqtt_queued_message structs share the
* same memory (and thus, a mqtt_queued_message must be allocated in
* the message queue's memory whenever a new message is registered).
*/
size_t curr_sz;
/**
* @brief The tail of the array of mqtt_queued_messages's.
*
* @note This member should not be used manually.
*/
struct mqtt_queued_message *queue_tail;
};
/**
* @brief Initialize a message queue.
* @ingroup details
*
* @param[out] mq The message queue to initialize.
* @param[in] buf The buffer for this message queue.
* @param[in] bufsz The number of bytes in the buffer.
*
* @relates mqtt_message_queue
*/
void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz);
/**
* @brief Clear as many messages from the front of the queue as possible.
* @ingroup details
*
* @note Calls to this function are the \em only way to remove messages from the queue.
*
* @param mq The message queue.
*
* @relates mqtt_message_queue
*/
void mqtt_mq_clean(struct mqtt_message_queue *mq);
/**
* @brief Register a message that was just added to the buffer.
* @ingroup details
*
* @note This function should be called immediately following a call to a packer function
* that returned a positive value. The positive value (number of bytes packed) should
* be passed to this function.
*
* @param mq The message queue.
* @param[in] nbytes The number of bytes that were just packed.
*
* @note This function will step mqtt_message_queue::curr and update mqtt_message_queue::curr_sz.
* @relates mqtt_message_queue
*
* @returns The newly added struct mqtt_queued_message.
*/
struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes);
/**
* @brief Find a message in the message queue.
* @ingroup details
*
* @param mq The message queue.
* @param[in] control_type The control type of the message you want to find.
* @param[in] packet_id The packet ID of the message you want to find. Set to \c NULL if you
* don't want to specify a packet ID.
*
* @relates mqtt_message_queue
* @returns The found message. \c NULL if the message was not found.
*/
struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQTTControlPacketType control_type, uint16_t *packet_id);
/**
* @brief Returns the mqtt_queued_message at \p index.
* @ingroup details
*
* @param mq_ptr A pointer to the message queue.
* @param index The index of the message.
*
* @returns The mqtt_queued_message at \p index.
*/
#define mqtt_mq_get(mq_ptr, index) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - 1 - index)
/**
* @brief Returns the number of messages in the message queue, \p mq_ptr.
* @ingroup details
*/
#define mqtt_mq_length(mq_ptr) (((struct mqtt_queued_message*) ((mq_ptr)->mem_end)) - (mq_ptr)->queue_tail)
/**
* @brief Used internally to recalculate the \c curr_sz.
* @ingroup details
*/
#define mqtt_mq_currsz(mq_ptr) (mq_ptr->curr >= (uint8_t*) ((mq_ptr)->queue_tail - 1)) ? 0 : ((uint8_t*) ((mq_ptr)->queue_tail - 1)) - (mq_ptr)->curr
/* CLIENT */
/**
* @brief An MQTT client.
* @ingroup details
*
* @note All members can be manipulated via the related functions.
*/
struct mqtt_client {
/** @brief The socket connecting to the MQTT broker. */
mqtt_pal_socket_handle socketfd;
/** @brief The LFSR state used to generate packet ID's. */
uint16_t pid_lfsr;
/** @brief The keep-alive time in seconds. */
uint16_t keep_alive;
/**
* @brief A counter counting pings that have been sent to keep the connection alive.
* @see keep_alive
*/
int number_of_keep_alives;
/**
* @brief The timestamp of the last message sent to the buffer.
*
* This is used to detect the need for keep-alive pings.
*
* @see keep_alive
*/
mqtt_pal_time_t time_of_last_send;
/**
* @brief The error state of the client.
*
* error should be MQTT_OK for the entirety of the connection.
*
* @note The error state will be MQTT_ERROR_CONNECT_NOT_CALLED until
* you call mqtt_connect.
*/
enum MQTTErrors error;
/**
* @brief The timeout period in seconds.
*
* If the broker doesn't return an ACK within response_timeout seconds a timeout
* will occur and the message will be retransmitted.
*
* @note The default value is 30 [seconds] but you can change it at any time.
*/
int response_timeout;
/** @brief A counter counting the number of timeouts that have occurred. */
int number_of_timeouts;
/**
* @brief Approximately much time it has typically taken to receive responses from the
* broker.
*
* @note This is tracked using a exponential-averaging.
*/
double typical_response_time;
/**
* @brief The callback that is called whenever a publish is received from the broker.
*
* Any topics that you have subscribed to will be returned from the broker as
* mqtt_response_publish messages. All the publishes received from the broker will
* be passed to this function.
*
* @note A pointer to publish_response_callback_state is always passed to the callback.
* Use publish_response_callback_state to keep track of any state information you
* need.
*/
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish);
/**
* @brief A pointer to any publish_response_callback state information you need.
*
* @note A pointer to this pointer will always be publish_response_callback upon
* receiving a publish message from the broker.
*/
void* publish_response_callback_state;
/**
* @brief A user-specified callback, triggered on each \ref mqtt_sync, allowing
* the user to perform state inspections (and custom socket error detection)
* on the client.
*
* This callback is triggered on each call to \ref mqtt_sync. If it returns MQTT_OK
* then \ref mqtt_sync will continue normally (performing reads and writes). If it
* returns an error then \ref mqtt_sync will not call reads and writes.
*
* This callback can be used to perform custom error detection, namely platform
* specific socket error detection, and force the client into an error state.
*
* This member is always initialized to NULL but it can be manually set at any
* time.
*/
enum MQTTErrors (*inspector_callback)(struct mqtt_client*);
/**
* @brief A callback that is called whenever the client is in an error state.
*
* This callback is responsible for: application level error handling, closing
* previous sockets, and reestabilishing the connection to the broker and
* session configurations (i.e. subscriptions).
*/
void (*reconnect_callback)(struct mqtt_client*, void**);
/**
* @brief A pointer to some state. A pointer to this member is passed to
* \ref mqtt_client.reconnect_callback.
*/
void* reconnect_state;
/**
* @brief The buffer where ingress data is temporarily stored.
*/
struct {
/** @brief The start of the receive buffer's memory. */
uint8_t *mem_start;
/** @brief The size of the receive buffer's memory. */
size_t mem_size;
/** @brief A pointer to the next writtable location in the receive buffer. */
uint8_t *curr;
/** @brief The number of bytes that are still writable at curr. */
size_t curr_sz;
} recv_buffer;
/**
* @brief A variable passed to support thread-safety.
*
* A pointer to this variable is passed to \c MQTT_PAL_MUTEX_LOCK, and
* \c MQTT_PAL_MUTEX_UNLOCK.
*/
mqtt_pal_mutex_t mutex;
/** @brief The sending message queue. */
struct mqtt_message_queue mq;
};
/**
* @brief Generate a new next packet ID.
* @ingroup details
*
* Packet ID's are generated using a max-length LFSR.
*
* @param client The MQTT client.
*
* @returns The new packet ID that should be used.
*/
uint16_t __mqtt_next_pid(struct mqtt_client *client);
/**
* @brief Handles egress client traffic.
* @ingroup details
*
* @param client The MQTT client.
*
* @returns MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
ssize_t __mqtt_send(struct mqtt_client *client);
/**
* @brief Handles ingress client traffic.
* @ingroup details
*
* @param client The MQTT client.
*
* @returns MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
ssize_t __mqtt_recv(struct mqtt_client *client);
/**
* @brief Function that does the actual sending and receiving of
* traffic from the network.
* @ingroup api
*
* All the other functions in the @ref api simply stage messages for
* being sent to the broker. This function does the actual sending of
* those messages. Additionally this function receives traffic (responses and
* acknowledgements) from the broker and responds to that traffic accordingly.
* Lastly this function also calls the \c publish_response_callback when
* any \c MQTT_CONTROL_PUBLISH messages are received.
*
* @pre mqtt_init must have been called.
*
* @param[in,out] client The MQTT client.
*
* @attention It is the responsibility of the application programmer to
* call this function periodically. All functions in the @ref api are
* thread-safe so it is perfectly reasonable to have a thread dedicated
* to calling this function every 200 ms or so. MQTT-C can be used in single
* threaded application though by simply calling this functino periodically
* inside your main thread. See @ref simple_publisher.c and @ref simple_subscriber.c
* for examples (specifically the \c client_refresher functions).
*
* @returns MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_sync(struct mqtt_client *client);
/**
* @brief Initializes an MQTT client.
* @ingroup api
*
* This function \em must be called before any other API function calls.
*
* @pre None.
*
* @param[out] client The MQTT client.
* @param[in] sockfd The socket file descriptor (or equivalent socket handle, e.g. BIO pointer
* for OpenSSL sockets) connected to the MQTT broker.
* @param[in] sendbuf A buffer that will be used for sending messages to the broker.
* @param[in] sendbufsz The size of \p sendbuf in bytes.
* @param[in] recvbuf A buffer that will be used for receiving messages from the broker.
* @param[in] recvbufsz The size of \p recvbuf in bytes.
* @param[in] publish_response_callback The callback to call whenever application messages
* are received from the broker.
*
* @post mqtt_connect must be called.
*
* @note \p sockfd is a non-blocking TCP connection.
* @note If \p sendbuf fills up completely during runtime a \c MQTT_ERROR_SEND_BUFFER_IS_FULL
* error will be set. Similarly if \p recvbuf is ever to small to receive a message from
* the broker an MQTT_ERROR_RECV_BUFFER_TOO_SMALL error will be set.
* @note A pointer to \ref mqtt_client.publish_response_callback_state is always passed as the
* \c state argument to \p publish_response_callback. Note that the second argument is
* the mqtt_response_publish that was received from the broker.
*
* @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or
* \ref mqtt_init_reconnect more than once per client).
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_init(struct mqtt_client *client,
mqtt_pal_socket_handle sockfd,
uint8_t *sendbuf, size_t sendbufsz,
uint8_t *recvbuf, size_t recvbufsz,
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish));
/**
* @brief Initializes an MQTT client and enables automatic reconnections.
* @ingroup api
*
* An alternative to \ref mqtt_init that allows the client to automatically reconnect to the
* broker after an error occurs (e.g. socket error or internal buffer overflows).
*
* This is accomplished by calling the \p reconnect_callback whenever the client enters an error
* state. The job of the \p reconnect_callback is to: (1) perform error handling/logging,
* (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the
* client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other
* API calls such as \ref mqtt_subscribe.
*
* The first argument to the \p reconnect_callback is the client (which will be in an error
* state) and the second argument is a pointer to a void pointer where you can store some state
* information. Internally, MQTT-C calls the reconnect callback like so:
*
* \code
* client->reconnect_callback(client, &client->reconnect_state)
* \endcode
*
* Note that the \p reconnect_callback is also called to setup the initial session. After
* calling \ref mqtt_init_reconnect the client will be in the error state
* \c MQTT_ERROR_INITIAL_RECONNECT.
*
* @pre None.
*
* @param[in,out] client The MQTT client that will be initialized.
* @param[in] reconnect_callback The callback that will be called to connect/reconnect the
* client to the broker and perform application level error handling.
* @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback.
* If your \p reconnect_callback does not require any state information set this
* to NULL. A pointer to the memory address where the client stores a copy of this
* pointer is passed as the second argumnet to \p reconnect_callback.
* @param[in] publish_response_callback The callback to call whenever application messages
* are received from the broker.
*
* @post Call \p reconnect_callback yourself, or call \ref mqtt_sync
* (which will trigger the call to \p reconnect_callback).
*
* @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or
* \ref mqtt_init_reconnect more than once per client).
*
*/
void mqtt_init_reconnect(struct mqtt_client *client,
void (*reconnect_callback)(struct mqtt_client *client, void** state),
void *reconnect_state,
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish));
/**
* @brief Safely assign/reassign a socket and buffers to an new/existing client.
* @ingroup api
*
* This function also clears the \p client error state. Upon exiting this function
* \c client->error will be \c MQTT_ERROR_CONNECT_NOT_CALLED (which will be cleared)
* as soon as \ref mqtt_connect is called.
*
* @pre This function must be called BEFORE \ref mqtt_connect.
*
* @param[in,out] client The MQTT client.
* @param[in] socketfd The new socket connected to the broker.
* @param[in] sendbuf The buffer that will be used to buffer egress traffic to the broker.
* @param[in] sendbufsz The size of \p sendbuf in bytes.
* @param[in] recvbuf The buffer that will be used to buffer ingress traffic from the broker.
* @param[in] recvbufsz The size of \p recvbuf in bytes.
*
* @post Call \ref mqtt_connect.
*
* @attention This function should be used in conjunction with clients that have been
* initialzed with \ref mqtt_init_reconnect.
*/
void mqtt_reinit(struct mqtt_client* client,
mqtt_pal_socket_handle socketfd,
uint8_t *sendbuf, size_t sendbufsz,
uint8_t *recvbuf, size_t recvbufsz);
/**
* @brief Establishes a session with the MQTT broker.
* @ingroup api
*
* @pre mqtt_init must have been called.
*
* @param[in,out] client The MQTT client.
* @param[in] client_id The unique name identifying the client.
* @param[in] will_topic The topic name of client's \p will_message. If no will message is
* desired set to \c NULL.
* @param[in] will_message The application message (data) to be published in the event the
* client ungracefully disconnects. Set to \c NULL if \p will_topic is \c NULL.
* @param[in] will_message_size The size of \p will_message in bytes.
* @param[in] user_name The username to use when establishing the session with the MQTT broker.
* Set to \c NULL if a username is not required.
* @param[in] password The password to use when establishing the session with the MQTT broker.
* Set to \c NULL if a password is not required.
* @param[in] connect_flags Additional \ref MQTTConnectFlags to use when establishing the connection.
* These flags are for forcing the session to start clean,
* \c MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the \p will_message with
* (provided \c will_message != \c NULL), MQTT_CONNECT_WILL_QOS_[0,1,2], and whether
* or not the broker should retain the \c will_message, MQTT_CONNECT_WILL_RETAIN.
* @param[in] keep_alive The keep-alive time in seconds. A reasonable value for this is 400 [seconds].
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_connect(struct mqtt_client *client,
const char* client_id,
const char* will_topic,
const void* will_message,
size_t will_message_size,
const char* user_name,
const char* password,
uint8_t connect_flags,
uint16_t keep_alive);
/*
todo: will_message should be a void*
*/
/**
* @brief Publish an application message.
* @ingroup api
*
* Publishes an application message to the MQTT broker.
*
* @pre mqtt_connect must have been called.
*
* @param[in,out] client The MQTT client.
* @param[in] topic_name The name of the topic.
* @param[in] application_message The data to be published.
* @param[in] application_message_size The size of \p application_message in bytes.
* @param[in] publish_flags \ref MQTTPublishFlags to be used, namely the QOS level to
* publish at (MQTT_PUBLISH_QOS_[0,1,2]) or whether or not the broker should
* retain the publish (MQTT_PUBLISH_RETAIN).
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_publish(struct mqtt_client *client,
const char* topic_name,
void* application_message,
size_t application_message_size,
uint8_t publish_flags);
/**
* @brief Acknowledge an ingree publish with QOS==1.
* @ingroup details
*
* @param[in,out] client The MQTT client.
* @param[in] packet_id The packet ID of the ingress publish being acknowledged.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id);
/**
* @brief Acknowledge an ingree publish with QOS==2.
* @ingroup details
*
* @param[in,out] client The MQTT client.
* @param[in] packet_id The packet ID of the ingress publish being acknowledged.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id);
/**
* @brief Acknowledge an ingree PUBREC packet.
* @ingroup details
*
* @param[in,out] client The MQTT client.
* @param[in] packet_id The packet ID of the ingress PUBREC being acknowledged.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id);
/**
* @brief Acknowledge an ingree PUBREL packet.
* @ingroup details
*
* @param[in,out] client The MQTT client.
* @param[in] packet_id The packet ID of the ingress PUBREL being acknowledged.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id);
/**
* @brief Subscribe to a topic.
* @ingroup api
*
* @pre mqtt_connect must have been called.
*
* @param[in,out] client The MQTT client.
* @param[in] topic_name The name of the topic to subscribe to.
* @param[in] max_qos_level The maximum QOS level with which the broker can send application
* messages for this topic.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_subscribe(struct mqtt_client *client,
const char* topic_name,
int max_qos_level);
/**
* @brief Unsubscribe from a topic.
* @ingroup api
*
* @pre mqtt_connect must have been called.
*
* @param[in,out] client The MQTT client.
* @param[in] topic_name The name of the topic to unsubscribe from.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client,
const char* topic_name);
/**
* @brief Ping the broker.
* @ingroup api
*
* @pre mqtt_connect must have been called.
*
* @param[in,out] client The MQTT client.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_ping(struct mqtt_client *client);
/**
* @brief Ping the broker without locking/unlocking the mutex.
* @see mqtt_ping
*/
enum MQTTErrors __mqtt_ping(struct mqtt_client *client);
/**
* @brief Terminate the session with the MQTT broker.
* @ingroup api
*
* @pre mqtt_connect must have been called.
*
* @param[in,out] client The MQTT client.
*
* @note To re-establish the session, mqtt_connect must be called.
*
* @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise.
*/
enum MQTTErrors mqtt_disconnect(struct mqtt_client *client);
#endif
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "mqtt.h"
/**
* @file
* @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and
* any platform-specific helpers you'd like.
* @cond Doxygen_Suppress
*/
#ifdef __unix__
#ifdef MQTT_USE_BIO
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
int tmp = BIO_write(fd, buf + sent, len - sent);
if (tmp > 0) {
sent += (size_t) tmp;
} else if (tmp <= 0 && !BIO_should_retry(fd)) {
return MQTT_ERROR_SOCKET_ERROR;
}
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void const *start = buf;
int rv;
do {
rv = BIO_read(fd, buf, bufsz);
if (rv > 0) {
/* successfully read bytes from the socket */
buf += rv;
bufsz -= rv;
} else if (!BIO_should_retry(fd)) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
} while (!BIO_should_read(fd));
return (ssize_t)(buf - start);
}
#else
#include <errno.h>
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
size_t sent = 0;
while(sent < len) {
ssize_t tmp = send(fd, buf + sent, len - sent, flags);
if (tmp < 1) {
return MQTT_ERROR_SOCKET_ERROR;
}
sent += (size_t) tmp;
}
return sent;
}
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
const void const *start = buf;
ssize_t rv;
do {
rv = recv(fd, buf, bufsz, flags);
if (rv > 0) {
/* successfully read bytes from the socket */
buf += rv;
bufsz -= rv;
} else if (rv < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
/* an error occurred that wasn't "nothing to read". */
return MQTT_ERROR_SOCKET_ERROR;
}
} while (rv > 0);
return buf - start;
}
#endif
#endif
/** @endcond */
\ No newline at end of file
#ifndef __MQTT_PAL_H__
#define __MQTT_PAL_H__
/*
MIT License
Copyright(c) 2018 Liam Bindle
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files(the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions :
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/**
* @file
* @brief Includes/supports the types/calls required by the MQTT-C client.
*
* @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore
* responsible for including/supporting all the required types and calls.
*
* @defgroup pal Platform abstraction layer
* @brief Documentation of the types and calls required to port MQTT-C to a new platform.
*
* mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a
* new platform the following types, functions, constants, and macros must be defined in
* mqtt_pal.h:
* - Types:
* - \c size_t, \c ssize_t
* - \c uint8_t, \c uint16_t, \c uint32_t
* - \c va_list
* - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME()
* - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and
* \c MQTT_PAL_MUTEX_RELEASE
* - Functions:
* - \c memcpy, \c strlen
* - \c va_start, \c va_arg, \c va_end
* - Constants:
* - \c INT_MIN
*
* Additionally, three macro's are required:
* - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t.
* - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t.
* - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds.
* - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer.
* - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by
* \c mtx_pointer.
*
* Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c
* for sending and receiving data using the platforms socket calls.
*/
/* UNIX-like platform support */
#ifdef __unix__
#include <limits.h>
#include <string.h>
#include <stdarg.h>
#include <time.h>
#include <arpa/inet.h>
#include <pthread.h>
#define MQTT_PAL_HTONS(s) htons(s)
#define MQTT_PAL_NTOHS(s) ntohs(s)
#define MQTT_PAL_TIME() time(NULL)
typedef time_t mqtt_pal_time_t;
typedef pthread_mutex_t mqtt_pal_mutex_t;
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL)
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr)
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr)
#ifndef MQTT_USE_CUSTOM_SOCKET_HANDLE
#ifdef MQTT_USE_BIO
#include <openssl/bio.h>
typedef BIO* mqtt_pal_socket_handle;
#else
typedef int mqtt_pal_socket_handle;
#endif
#endif
#endif
/**
* @brief Sends all the bytes in a buffer.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the first byte in the buffer to send.
* @param[in] len The number of bytes to send (starting at \p buf).
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags);
/**
* @brief Non-blocking receive all the byte available.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the receive buffer.
* @param[in] bufsz The max number of bytes that can be put into \p buf.
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes received if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags);
#endif
\ No newline at end of file
### MQTT-C-Client
Modified based on MQTT-C
For more information, please click https://github.com/LiamBindle/MQTT-C
#ifndef __BIO_SOCKET_TEMPLATE_H__
#define __BIO_SOCKET_TEMPLATE_H__
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
/*
A template for opening a non-blocking BIO socket.
*/
BIO* open_nb_socket(const char* addr, const char* port) {
BIO* bio = BIO_new_connect(addr);
BIO_set_nbio(bio, 1);
BIO_set_conn_port(bio, port);
/* timeout after 10 seconds */
int start_time = time(NULL);
while(BIO_do_connect(bio) == 0 && (int)time(NULL) - start_time < 10);
if (BIO_do_connect(bio) <= 0) {
fprintf(stderr, "Failed to open socket: BIO_do_connect returned <= 0\n");
return NULL;
}
return bio;
}
#endif
\ No newline at end of file
#ifndef __OPENSSL_SOCKET_TEMPLATE_H__
#define __OPENSSL_SOCKET_TEMPLATE_H__
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
/*
A template for opening a non-blocking OpenSSL connection.
*/
void open_nb_socket(BIO** bio, SSL_CTX** ssl_ctx, const char* addr, const char* port, const char* ca_file, const char* ca_path) {
*ssl_ctx = SSL_CTX_new(SSLv23_client_method());
SSL* ssl;
/* load certificate */
if (!SSL_CTX_load_verify_locations(*ssl_ctx, ca_file, ca_path)) {
printf("error: failed to load certificate\n");
exit(1);
}
/* open BIO socket */
*bio = BIO_new_ssl_connect(*ssl_ctx);
BIO_get_ssl(*bio, &ssl);
SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
BIO_set_conn_hostname(*bio, addr);
BIO_set_nbio(*bio, 1);
BIO_set_conn_port(*bio, port);
/* wait for connect with 10 second timeout */
int start_time = time(NULL);
while(BIO_do_connect(*bio) <= 0 && (int)time(NULL) - start_time < 10);
if (BIO_do_connect(*bio) <= 0) {
printf("error: %s\n", ERR_reason_error_string(ERR_get_error()));
BIO_free_all(*bio);
SSL_CTX_free(*ssl_ctx);
*bio = NULL;
*ssl_ctx=NULL;
return;
}
/* verify certificate */
if (SSL_get_verify_result(ssl) != X509_V_OK) {
/* Handle the failed verification */
printf("error: x509 certificate verification failed\n");
exit(1);
}
}
#endif
\ No newline at end of file
#ifndef __POSIX_SOCKET_TEMPLATE_H__
#define __POSIX_SOCKET_TEMPLATE_H__
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <fcntl.h>
/*
A template for opening a non-blocking POSIX socket.
*/
int open_nb_socket(const char* addr, const char* port) {
struct addrinfo hints = {0};
hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Must be TCP */
int sockfd = -1;
int rv;
struct addrinfo *p, *servinfo;
/* get address information */
rv = getaddrinfo(addr, port, &hints, &servinfo);
if(rv != 0) {
fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerror(rv));
return -1;
}
/* open the first possible socket */
for(p = servinfo; p != NULL; p = p->ai_next) {
sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (sockfd == -1) continue;
/* connect to server */
rv = connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen);
if(rv == -1) continue;
break;
}
/* free servinfo */
freeaddrinfo(servinfo);
/* make non-blocking */
if (sockfd != -1) fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
/* return the new socket fd */
return sockfd;
}
#endif
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MQTTnet" Version="3.0.11" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
</ItemGroup>
</Project>
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using Newtonsoft.Json;
using System;
using System.Threading.Tasks;
namespace MQTTClient
{
internal class Program
{
private static void Main(string[] args)
{
Console.WriteLine("Hello World!");
MQTTnet.MqttFactory factory = new MQTTnet.MqttFactory();
var client = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithClientId(Guid.NewGuid().ToString())
.WithTcpServer("localhost")
.WithCredentials("3cb97cd31fbc40b08d12ec47a6fad622","")//token
.Build();
client.UseApplicationMessageReceivedHandler(ax=>
{
Console.WriteLine($"ClientId{ ax.ClientId},msg={ax.ApplicationMessage.ConvertPayloadToString()}");
});
Task.Run(async () =>
{
await client.ConnectAsync(options);
do
{
var message = new MqttApplicationMessageBuilder()
.WithTopic("/devices/me/telemetry")
.WithPayload(JsonConvert.SerializeObject(new
{
RandomString = Guid.NewGuid().ToString(),
NowTime = DateTime.Now
}))
.Build();
Console.WriteLine(message.ConvertPayloadToString());
await client.PublishAsync(message);
await Task.Delay(TimeSpan.FromSeconds(10));
await client.SubscribeAsync("/devices/me/attributes/response/+");
await client.PublishAsync("/devices/me/attributes/request/1", "{\"anySide\":\"Doublevalue,longvalue,Doublevalue,longvalue\"}");
} while (Console.ReadKey().Key != ConsoleKey.Escape);
await client.DisconnectAsync();
}).Wait();
}
}
}
\ No newline at end of file
### Topic
- Upload telemetry to device
/devices/me/telemetry
- Upload telemetry data to gateway's device
/devices/devicename/telemetry
- Upload attributes to device
/devices/me/attributes
- Upload attributes data to gateway's device
/devices/devicename/telemetry
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册