handle_publish.c 10.7 KB
Newer Older
1
/*
2
Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
3 4

All rights reserved. This program and the accompanying materials
R
Roger A. Light 已提交
5
are made available under the terms of the Eclipse Public License 2.0
6 7 8
and Eclipse Distribution License v1.0 which accompany this distribution.

The Eclipse Public License is available at
R
Roger A. Light 已提交
9
   https://www.eclipse.org/legal/epl-2.0/
10 11 12
and the Eclipse Distribution License is available at
  http://www.eclipse.org/org/documents/edl-v10.php.

13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
R
Roger A. Light 已提交
14

15 16 17 18
Contributors:
   Roger Light - initial implementation and documentation.
*/

19 20
#include "config.h"

21 22 23 24
#include <assert.h>
#include <stdio.h>
#include <string.h>

25
#include "mosquitto_broker_internal.h"
26
#include "alias_mosq.h"
27
#include "mqtt_protocol.h"
28 29
#include "memory_mosq.h"
#include "packet_mosq.h"
30
#include "property_mosq.h"
31 32 33 34 35 36
#include "read_handle.h"
#include "send_mosq.h"
#include "sys_tree.h"
#include "util_mosq.h"


37
int handle__publish(struct mosquitto *context)
38
{
39
	uint8_t dup;
40
	int rc = 0;
41
	int rc2;
42 43
	uint8_t header = context->in_packet.command;
	int res = 0;
44
	struct mosquitto_msg_store *msg, *stored = NULL;
R
Roger A. Light 已提交
45
	struct mosquitto_client_msg *cmsg_stored = NULL;
R
Roger A. Light 已提交
46 47
	size_t len;
	uint16_t slen;
48
	char *topic_mount;
49 50
	mosquitto_property *properties = NULL;
	mosquitto_property *p, *p_prev;
51
	mosquitto_property *msg_properties_last;
R
Roger A. Light 已提交
52
	uint32_t message_expiry_interval = 0;
53
	int topic_alias = -1;
54
	uint8_t reason_code = 0;
55
	uint16_t mid = 0;
56

57
	if(context->state != mosq_cs_active){
58 59 60
		return MOSQ_ERR_PROTOCOL;
	}

61 62 63 64
	msg = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
	if(msg == NULL){
		return MOSQ_ERR_NOMEM;
	}
65 66

	dup = (header & 0x08)>>3;
67
	msg->qos = (header & 0x06)>>1;
68 69 70 71 72 73
	if(dup == 1 && msg->qos == 0){
		log__printf(NULL, MOSQ_LOG_INFO,
				"Invalid PUBLISH (QoS=0 and DUP=1) from %s, disconnecting.", context->id);
		db__msg_store_free(msg);
		return MOSQ_ERR_MALFORMED_PACKET;
	}
74
	if(msg->qos == 3){
75 76
		log__printf(NULL, MOSQ_LOG_INFO,
				"Invalid QoS in PUBLISH from %s, disconnecting.", context->id);
77
		db__msg_store_free(msg);
78
		return MOSQ_ERR_MALFORMED_PACKET;
79
	}
80
	if(msg->qos > context->max_qos){
81 82
		log__printf(NULL, MOSQ_LOG_INFO,
				"Too high QoS in PUBLISH from %s, disconnecting.", context->id);
83
		db__msg_store_free(msg);
84
		return MOSQ_ERR_QOS_NOT_SUPPORTED;
85
	}
86
	msg->retain = (header & 0x01);
87

88
	if(msg->retain && db.config->retain_available == false){
89
		db__msg_store_free(msg);
90
		return MOSQ_ERR_RETAIN_NOT_SUPPORTED;
R
Roger Light 已提交
91 92
	}

93 94
	if(packet__read_string(&context->in_packet, &msg->topic, &slen)){
		db__msg_store_free(msg);
95
		return MOSQ_ERR_MALFORMED_PACKET;
96
	}
97
	if(!slen && context->protocol != mosq_p_mqtt5){
98
		/* Invalid publish topic, disconnect client. */
99
		db__msg_store_free(msg);
100
		return MOSQ_ERR_MALFORMED_PACKET;
101 102
	}

103
	if(msg->qos > 0){
104
		if(packet__read_uint16(&context->in_packet, &mid)){
105
			db__msg_store_free(msg);
106
			return MOSQ_ERR_MALFORMED_PACKET;
107
		}
108
		if(mid == 0){
109
			db__msg_store_free(msg);
R
Roger A. Light 已提交
110 111
			return MOSQ_ERR_PROTOCOL;
		}
112 113 114
		/* It is important to have a separate copy of mid, because msg may be
		 * freed before we want to send a PUBACK/PUBREC. */
		msg->source_mid = mid;
115 116
	}

117
	/* Handle properties */
118
	if(context->protocol == mosq_p_mqtt5){
119
		rc = property__read_all(CMD_PUBLISH, &context->in_packet, &properties);
120 121
		if(rc){
			db__msg_store_free(msg);
122
			return rc;
123
		}
124 125 126

		p = properties;
		p_prev = NULL;
127
		msg->properties = NULL;
128 129 130
		msg_properties_last = NULL;
		while(p){
			switch(p->identifier){
131
				case MQTT_PROP_CONTENT_TYPE:
132
				case MQTT_PROP_CORRELATION_DATA:
133 134
				case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
				case MQTT_PROP_RESPONSE_TOPIC:
135
				case MQTT_PROP_USER_PROPERTY:
136
					if(msg->properties){
137 138 139
						msg_properties_last->next = p;
						msg_properties_last = p;
					}else{
140
						msg->properties = p;
141 142 143 144 145 146 147 148 149
						msg_properties_last = p;
					}
					if(p_prev){
						p_prev->next = p->next;
						p = p_prev->next;
					}else{
						properties = p->next;
						p = properties;
					}
150
					msg_properties_last->next = NULL;
151 152
					break;

153
				case MQTT_PROP_TOPIC_ALIAS:
154
					topic_alias = p->value.i16;
155
					p_prev = p;
156 157 158
					p = p->next;
					break;

159
				case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
R
Roger A. Light 已提交
160
					message_expiry_interval = p->value.i32;
161
					p_prev = p;
162 163 164
					p = p->next;
					break;

165
				case MQTT_PROP_SUBSCRIPTION_IDENTIFIER:
166
					p_prev = p;
167 168 169 170 171 172 173 174
					p = p->next;
					break;

				default:
					p = p->next;
					break;
			}
		}
175
	}
176
	mosquitto_property_free_all(&properties);
177

R
Roger A. Light 已提交
178
	if(topic_alias == 0 || (context->listener && topic_alias > context->listener->max_topic_alias)){
179
		db__msg_store_free(msg);
180
		return MOSQ_ERR_TOPIC_ALIAS_INVALID;
181
	}else if(topic_alias > 0){
182
		if(msg->topic){
R
Roger A. Light 已提交
183
			rc = alias__add(context, msg->topic, (uint16_t)topic_alias);
R
Roger A. Light 已提交
184
			if(rc){
185
				db__msg_store_free(msg);
R
Roger A. Light 已提交
186 187
				return rc;
			}
188
		}else{
R
Roger A. Light 已提交
189
			rc = alias__find(context, &msg->topic, (uint16_t)topic_alias);
190
			if(rc){
191
				db__msg_store_free(msg);
192
				return MOSQ_ERR_PROTOCOL;
193 194
			}
		}
195 196 197
	}

#ifdef WITH_BRIDGE
198 199 200 201 202
	rc = bridge__remap_topic_in(context, &msg->topic);
	if(rc){
		db__msg_store_free(msg);
		return rc;
	}
203 204

#endif
205
	if(mosquitto_pub_topic_check(msg->topic) != MOSQ_ERR_SUCCESS){
206
		/* Invalid publish topic, just swallow it. */
207
		db__msg_store_free(msg);
208
		return MOSQ_ERR_MALFORMED_PACKET;
209 210
	}

211 212
	msg->payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
	G_PUB_BYTES_RECEIVED_INC(msg->payloadlen);
213
	if(context->listener && context->listener->mount_point){
214
		len = strlen(context->listener->mount_point) + strlen(msg->topic) + 1;
215 216
		topic_mount = mosquitto__malloc(len+1);
		if(!topic_mount){
217
			db__msg_store_free(msg);
218 219
			return MOSQ_ERR_NOMEM;
		}
220
		snprintf(topic_mount, len, "%s%s", context->listener->mount_point, msg->topic);
221 222
		topic_mount[len] = '\0';

223 224
		mosquitto__free(msg->topic);
		msg->topic = topic_mount;
225 226
	}

227
	if(msg->payloadlen){
228
		if(db.config->message_size_limit && msg->payloadlen > db.config->message_size_limit){
229
			log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
230
			reason_code = MQTT_RC_PACKET_TOO_LARGE;
231 232
			goto process_bad_message;
		}
233 234
		msg->payload = mosquitto__malloc(msg->payloadlen+1);
		if(msg->payload == NULL){
235
			db__msg_store_free(msg);
236 237
			return MOSQ_ERR_NOMEM;
		}
238 239
		/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
		((uint8_t *)msg->payload)[msg->payloadlen] = 0;
R
Roger A. Light 已提交
240

241
		if(packet__read_bytes(&context->in_packet, msg->payload, msg->payloadlen)){
242
			db__msg_store_free(msg);
243
			return MOSQ_ERR_MALFORMED_PACKET;
244 245 246 247
		}
	}

	/* Check for topic access */
248
	rc = mosquitto_acl_check(context, msg->topic, msg->payloadlen, msg->payload, msg->qos, msg->retain, MOSQ_ACL_WRITE);
249
	if(rc == MOSQ_ERR_ACL_DENIED){
250 251 252 253 254
		log__printf(NULL, MOSQ_LOG_DEBUG,
				"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
				context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic,
				(long)msg->payloadlen);
		reason_code = MQTT_RC_NOT_AUTHORIZED;
255 256
		goto process_bad_message;
	}else if(rc != MOSQ_ERR_SUCCESS){
257
		db__msg_store_free(msg);
258 259 260
		return rc;
	}

261
	log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
262 263

	if(!strncmp(msg->topic, "$CONTROL/", 9)){
264
#ifdef WITH_CONTROL
265
		rc = control__process(context, msg);
266 267
		db__msg_store_free(msg);
		return rc;
268
#else
269 270
		reason_code = MQTT_RC_IMPLEMENTATION_SPECIFIC;
		goto process_bad_message;
271
#endif
272 273
	}

R
Roger A. Light 已提交
274
	{
275
		rc = plugin__handle_message(context, msg);
276 277 278 279 280 281 282 283 284
		if(rc == MOSQ_ERR_ACL_DENIED){
			log__printf(NULL, MOSQ_LOG_DEBUG,
					"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
					context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic,
					(long)msg->payloadlen);

			reason_code = MQTT_RC_NOT_AUTHORIZED;
			goto process_bad_message;
		}else if(rc != MOSQ_ERR_SUCCESS){
R
Roger A. Light 已提交
285 286 287 288 289
			db__msg_store_free(msg);
			return rc;
		}
	}

290
	if(msg->qos > 0){
R
Roger A. Light 已提交
291
		db__message_store_find(context, msg->source_mid, &cmsg_stored);
292
	}
293

R
Roger A. Light 已提交
294 295 296 297 298
	if(cmsg_stored && cmsg_stored->store && msg->source_mid != 0 &&
			(cmsg_stored->store->qos != msg->qos
			 || cmsg_stored->store->payloadlen != msg->payloadlen
			 || strcmp(cmsg_stored->store->topic, msg->topic)
			 || memcmp(cmsg_stored->store->payload, msg->payload, msg->payloadlen) )){
R
Roger A. Light 已提交
299

300 301
		log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", msg->source_mid, context->id);
		db__message_remove_incoming(context, msg->source_mid);
R
Roger A. Light 已提交
302
		cmsg_stored = NULL;
303 304
	}

R
Roger A. Light 已提交
305
	if(!cmsg_stored){
306
		if(msg->qos == 0
307
				|| db__ready_for_flight(context, mosq_md_in, msg->qos)
R
Roger A. Light 已提交
308
				){
309 310

			dup = 0;
311
			rc = db__message_store(context, msg, message_expiry_interval, 0, mosq_mo_client);
312
			if(rc) return rc;
313 314 315 316
		}else{
			/* Client isn't allowed any more incoming messages, so fail early */
			reason_code = MQTT_RC_QUOTA_EXCEEDED;
			goto process_bad_message;
317
		}
318 319
		stored = msg;
		msg = NULL;
R
Roger A. Light 已提交
320
		dup = 0;
321
	}else{
322 323
		db__msg_store_free(msg);
		msg = NULL;
R
Roger A. Light 已提交
324 325 326
		stored = cmsg_stored->store;
		cmsg_stored->dup++;
		dup = cmsg_stored->dup;
327 328
	}

329
	switch(stored->qos){
330
		case 0:
331
			rc2 = sub__messages_queue(context->id, stored->topic, stored->qos, stored->retain, &stored);
332
			if(rc2 > 0) rc = 1;
333 334
			break;
		case 1:
335
			util__decrement_receive_quota(context);
336
			rc2 = sub__messages_queue(context->id, stored->topic, stored->qos, stored->retain, &stored);
337
			/* stored may now be free, so don't refer to it */
338
			if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){
339
				if(send__puback(context, mid, 0, NULL)) rc = 1;
340
			}else if(rc2 == MOSQ_ERR_NO_SUBSCRIBERS){
341
				if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1;
342 343 344
			}else{
				rc = rc2;
			}
345 346
			break;
		case 2:
347
			if(dup == 0){
348
				res = db__message_insert(context, stored->source_mid, mosq_md_in, stored->qos, stored->retain, stored, NULL, false);
349 350 351
			}else{
				res = 0;
			}
R
Roger A. Light 已提交
352

353 354
			/* db__message_insert() returns 2 to indicate dropped message
			 * due to queue. This isn't an error so don't disconnect them. */
355
			/* FIXME - this is no longer necessary due to failing early above */
356
			if(!res){
R
Roger A. Light 已提交
357 358 359 360 361 362
				if(dup == 0 || dup == 1){
					rc2 = send__pubrec(context, stored->source_mid, 0, NULL);
					if(rc2) rc = rc2;
				}else{
					return MOSQ_ERR_PROTOCOL;
				}
363 364 365 366 367 368
			}else if(res == 1){
				rc = 1;
			}
			break;
	}

369
	db__message_write_queued_in(context);
370 371
	return rc;
process_bad_message:
372 373 374 375 376 377 378 379 380 381
	rc = 1;
	if(msg){
		switch(msg->qos){
			case 0:
				rc = MOSQ_ERR_SUCCESS;
				break;
			case 1:
				rc = send__puback(context, msg->source_mid, reason_code, NULL);
				break;
			case 2:
382
				rc = send__pubrec(context, msg->source_mid, reason_code, NULL);
383 384 385
				break;
		}
		db__msg_store_free(msg);
386
	}
R
Roger A. Light 已提交
387 388 389
	if(context->out_packet_count >= db.config->max_queued_messages){
		rc = MQTT_RC_QUOTA_EXCEEDED;
	}
390
	return rc;
391 392
}