PubSubClient3 v3.1.0
Located at <a href='https://github.com/hmueller01/pubsubclient3'>GitHub</a>
 
Loading...
Searching...
No Matches
PubSubClient.cpp
Go to the documentation of this file.
1
10
11#include "PubSubClient.h"
12
20#define CHECK_STRING_LENGTH(l, s) \
21 if ((!s) || (l + 2 + strnlen(s, this->bufferSize) > this->bufferSize)) { \
22 _client->stop(); \
23 return false; \
24 }
25
31
33 setClient(client);
34}
35
36PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) : PubSubClient() {
37 setServer(addr, port);
38 setClient(client);
39}
40
41PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
42 setServer(addr, port);
43 setClient(client);
44 setStream(stream);
45}
46
47PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
48 setServer(addr, port);
49 setCallback(callback);
50 setClient(client);
51}
52
53PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
54 setServer(addr, port);
55 setCallback(callback);
56 setClient(client);
57 setStream(stream);
58}
59
60PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, Client& client) : PubSubClient() {
61 setServer(ip, port);
62 setClient(client);
63}
64
65PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
66 setServer(ip, port);
67 setClient(client);
68 setStream(stream);
69}
70
71PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
72 setServer(ip, port);
73 setCallback(callback);
74 setClient(client);
75}
76
77PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
78 setServer(ip, port);
79 setCallback(callback);
80 setClient(client);
81 setStream(stream);
82}
83
84PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) : PubSubClient() {
85 setServer(domain, port);
86 setClient(client);
87}
88
89PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
90 setServer(domain, port);
91 setClient(client);
92 setStream(stream);
93}
94
95PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
96 setServer(domain, port);
97 setCallback(callback);
98 setClient(client);
99}
100
101PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
102 setServer(domain, port);
103 setCallback(callback);
104 setClient(client);
105 setStream(stream);
106}
107
109 free(this->domain);
110 free(this->buffer);
111}
112
113bool PubSubClient::connect(const char* id) {
114 return connect(id, nullptr, nullptr, nullptr, MQTT_QOS0, false, nullptr, true);
115}
116
117bool PubSubClient::connect(const char* id, const char* user, const char* pass) {
118 return connect(id, user, pass, nullptr, MQTT_QOS0, false, nullptr, true);
119}
120
121bool PubSubClient::connect(const char* id, const char* willTopic, uint8_t willQos, bool willRetain, const char* willMessage) {
122 return connect(id, nullptr, nullptr, willTopic, willQos, willRetain, willMessage, true);
123}
124
125bool PubSubClient::connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, bool willRetain,
126 const char* willMessage) {
127 return connect(id, user, pass, willTopic, willQos, willRetain, willMessage, true);
128}
129
130bool PubSubClient::connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, bool willRetain,
131 const char* willMessage, bool cleanSession) {
132 if (!connected()) {
133 int result = 0;
134
135 if (_client->connected()) {
136 result = 1;
137 } else if (this->port != 0) {
138 if (this->domain) {
139 result = _client->connect(this->domain, this->port);
140 } else {
141 result = _client->connect(this->ip, this->port);
142 }
143 }
144
145 if (result == 1) {
146 nextMsgId = 1; // init msgId (packet identifier)
147
148#if MQTT_VERSION == MQTT_VERSION_3_1
149 const uint8_t protocol[9] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTT_VERSION};
150#elif MQTT_VERSION == MQTT_VERSION_3_1_1
151 const uint8_t protocol[7] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION};
152#endif
153 // Leave room in the buffer for header and variable length field
154 memcpy(this->buffer + MQTT_MAX_HEADER_SIZE, protocol, sizeof(protocol));
155
156 size_t length = MQTT_MAX_HEADER_SIZE + sizeof(protocol);
157 uint8_t flags = 0x00;
158 if (willTopic) {
159 flags = (0x01 << 2) | (willQos << 3) | (willRetain << 5); // set will flag bit 2, will QoS and will retain bit 5
160 }
161 if (cleanSession) {
162 flags = flags | (0x01 << 1); // set clean session bit 1
163 }
164 if (user) {
165 flags = flags | (0x01 << 7); // set user name flag bit 7
166 if (pass) {
167 flags = flags | (0x01 << 6); // set password flag bit 6
168 }
169 }
170 const uint16_t keepAlive = this->keepAliveMillis / 1000;
171 this->buffer[length++] = flags;
172 this->buffer[length++] = keepAlive >> 8;
173 this->buffer[length++] = keepAlive & 0xFF;
174
175 CHECK_STRING_LENGTH(length, id)
176 length = writeString(id, this->buffer, length, this->bufferSize);
177 if (willTopic) {
178 CHECK_STRING_LENGTH(length, willTopic)
179 length = writeString(willTopic, this->buffer, length, this->bufferSize);
180 CHECK_STRING_LENGTH(length, willMessage)
181 length = writeString(willMessage, this->buffer, length, this->bufferSize);
182 }
183
184 if (user) {
185 CHECK_STRING_LENGTH(length, user)
186 length = writeString(user, this->buffer, length, this->bufferSize);
187 if (pass) {
188 CHECK_STRING_LENGTH(length, pass)
189 length = writeString(pass, this->buffer, length, this->bufferSize);
190 }
191 }
192
193 write(MQTTCONNECT, this->buffer, length - MQTT_MAX_HEADER_SIZE);
194
195 lastInActivity = lastOutActivity = millis();
196 pingOutstanding = false;
197
198 while (!_client->available()) {
199 yield();
200 unsigned long t = millis();
201 if (t - lastInActivity >= this->socketTimeoutMillis) {
202 DEBUG_PSC_PRINTF("connect aborting due to timeout\n");
204 _client->stop();
205 return false;
206 }
207 }
208 uint8_t hdrLen;
209 size_t len = readPacket(&hdrLen);
210
211 if (len == 4) {
212 if (buffer[3] == 0) {
213 lastInActivity = millis();
214 _state = MQTT_CONNECTED;
215 return true;
216 } else {
217 _state = buffer[3];
218 }
219 }
220 DEBUG_PSC_PRINTF("connect aborting due to protocol error\n");
221 _client->stop();
222 } else {
223 _state = MQTT_CONNECT_FAILED;
224 }
225 return false;
226 }
227 return true;
228}
229
231 if (!_client) return false;
232
233 if (_client->connected()) {
234 return (_state == MQTT_CONNECTED);
235 } else if (_state == MQTT_CONNECTED) {
236 DEBUG_PSC_PRINTF("lost connection (client may have more details)\n");
237 _state = MQTT_CONNECTION_LOST;
238 _client->stop();
239 pingOutstanding = false;
240 }
241 return false;
242}
243
245 DEBUG_PSC_PRINTF("disconnect called\n");
246 this->buffer[0] = MQTTDISCONNECT;
247 this->buffer[1] = 0;
248 _client->write(this->buffer, 2);
249 _state = MQTT_DISCONNECTED;
250 _client->flush();
251 _client->stop();
252 lastInActivity = lastOutActivity = millis();
253 pingOutstanding = false;
254}
255
262bool PubSubClient::readByte(uint8_t* result) {
263 unsigned long previousMillis = millis();
264 while (!_client->available()) {
265 yield();
266 unsigned long currentMillis = millis();
267 if (currentMillis - previousMillis >= this->socketTimeoutMillis) {
268 return false;
269 }
270 }
271 int rc = _client->read();
272 if (rc < 0) {
273 return false;
274 }
275 *result = (uint8_t)rc;
276 return true;
277}
278
285bool PubSubClient::readByte(uint8_t* result, size_t* pos) {
286 uint8_t* write_address = &(result[*pos]);
287 if (readByte(write_address)) {
288 (*pos)++;
289 return true;
290 }
291 return false;
292}
293
300size_t PubSubClient::readPacket(uint8_t* hdrLen) {
301 size_t len = 0;
302 if (!readByte(this->buffer, &len)) return 0;
303 bool isPublish = (this->buffer[0] & 0xF0) == MQTTPUBLISH;
304 uint32_t multiplier = 1;
305 size_t length = 0;
306 uint8_t digit = 0;
307 uint16_t skip = 0;
308 uint8_t start = 0;
309
310 do {
311 if (len == MQTT_MAX_HEADER_SIZE) {
312 // Invalid remaining length encoding - kill the connection
313 DEBUG_PSC_PRINTF("readPacket detected packet of invalid length\n");
314 _state = MQTT_DISCONNECTED;
315 _client->stop();
316 return 0;
317 }
318 if (!readByte(&digit)) return 0;
319 this->buffer[len++] = digit;
320 length += (digit & 0x7F) * multiplier; // length is coded in the lower 7 bits
321 multiplier <<= 7; // multiplier *= 128
322 } while ((digit & 0x80) != 0); // do while 8th continuation bit is set
323 *hdrLen = (uint8_t)(len - 1);
324
325 DEBUG_PSC_PRINTF("readPacket received packet of length %zu (isPublish = %u)\n", length, isPublish);
326
327 if (isPublish) {
328 // Read in topic length to calculate bytes to skip over for Stream writing
329 if (!readByte(this->buffer, &len)) return 0;
330 if (!readByte(this->buffer, &len)) return 0;
331 skip = (this->buffer[*hdrLen + 1] << 8) + this->buffer[*hdrLen + 2];
332 start = 2;
333 if (MQTT_HDR_GET_QOS(this->buffer[0]) > MQTT_QOS0) {
334 // skip msgId (packet identifier) for QoS 1 and 2 messages
335 skip += 2;
336 }
337 }
338 size_t idx = len;
339
340 for (size_t i = start; i < length; i++) {
341 if (!readByte(&digit)) return 0;
342 if (this->stream) {
343 if (isPublish && idx - *hdrLen - 2 > skip) {
344 this->stream->write(digit);
345 }
346 }
347
348 if (len < this->bufferSize) {
349 this->buffer[len++] = digit;
350 }
351 idx++;
352 }
353
354 if (!this->stream && idx > this->bufferSize) {
355 DEBUG_PSC_PRINTF("readPacket ignoring packet of size %zu exceeding buffer of size %zu\n", length, this->bufferSize);
356 len = 0; // This will cause the packet to be ignored.
357 }
358 return len;
359}
360
368bool PubSubClient::handlePacket(uint8_t hdrLen, size_t length) {
369 uint8_t type = this->buffer[0] & 0xF0;
370 DEBUG_PSC_PRINTF("received message of type %u\n", type);
371 switch (type) {
372 case MQTTPUBLISH:
373 if (callback) {
374 // MQTT Publish packet: See section 3.3 MQTT v3.1.1 protocol specification:
375 // - Header: 1 byte
376 // - Remaining header length: hdrLen bytes, multibyte field (1 .. MQTT_MAX_HEADER_SIZE - 1)
377 // - Topic length: 2 bytes (starts at buffer[hdrLen + 1])
378 // - Topic: topicLen bytes (starts at buffer[hdrLen + 3])
379 // - Packet Identifier (msgId): 0 bytes for QoS 0, 2 bytes for QoS 1 and 2 (starts at buffer[hdrLen + 3 + topicLen])
380 // - Payload (for QoS = 0): length - (hdrLen + 3 + topicLen) bytes (starts at buffer[hdrLen + 3 + topicLen])
381 // - Payload (for QoS > 0): length - (hdrLen + 5 + topicLen) bytes (starts at buffer[hdrLen + 5 + topicLen])
382 // To get a null reminated 'C' topic string we move the topic 1 byte to the front (overwriting the LSB of the topic lenght)
383 uint16_t topicLen = (this->buffer[hdrLen + 1] << 8) + this->buffer[hdrLen + 2]; // topic length in bytes
384 char* topic = (char*)(this->buffer + hdrLen + 3 - 1); // set the topic in the LSB of the topic lenght, as we move it there
385 uint16_t payloadOffset = hdrLen + 3 + topicLen; // payload starts after header and topic (if there is no packet identifier)
386 size_t payloadLen = length - payloadOffset; // this might change by 2 if we have a QoS 1 or 2 message
387 uint8_t* payload = this->buffer + payloadOffset;
388
389 if (length < payloadOffset) { // do not move outside the max bufferSize
390 ERROR_PSC_PRINTF_P("handlePacket(): Suspicious topicLen (%u) points outside of received buffer length (%zu)\n", topicLen, length);
391 return false;
392 }
393 memmove(topic, topic + 1, topicLen); // move topic inside buffer 1 byte to front
394 topic[topicLen] = '\0'; // end the topic as a 'C' string with \x00
395
396 if (MQTT_HDR_GET_QOS(this->buffer[0]) == MQTT_QOS0) {
397 // No msgId for QOS == 0
398 callback(topic, payload, payloadLen);
399 } else {
400 // For QOS 1 and 2 we have a msgId (packet identifier) after the topic at the current payloadOffset
401 if (payloadLen < 2) { // payload must be >= 2, as we have the msgId before
402 ERROR_PSC_PRINTF_P("handlePacket(): Missing msgId in QoS 1/2 message\n");
403 return false;
404 }
405 uint16_t msgId = (this->buffer[payloadOffset] << 8) + this->buffer[payloadOffset + 1];
406 callback(topic, payload + 2, payloadLen - 2); // remove the msgId from the callback payload
407
408 this->buffer[0] = MQTTPUBACK;
409 this->buffer[1] = 2;
410 this->buffer[2] = (msgId >> 8);
411 this->buffer[3] = (msgId & 0xFF);
412 if (_client->write(this->buffer, 4) == 4) {
413 lastOutActivity = millis();
414 }
415 }
416 }
417 break;
418 case MQTTPUBACK:
419 // MQTT Publish Acknowledgment (QoS 1 publish received): See section 3.4 MQTT v3.1.1 protocol specification
420 if (length < 4) {
421 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBACK packet with length %zu, expected at least 4 bytes\n", length);
422 return false;
423 }
424 // No futher action here, as resending is not supported.
425 break;
426 case MQTTPUBREC:
427 // MQTT Publish Received (QoS 2 publish received, part 1): See section 3.5 MQTT v3.1.1 protocol specification
428 if (length < 4) {
429 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBREC packet with length %zu, expected at least 4 bytes\n", length);
430 return false;
431 }
432 // MQTT Publish Release (QoS 2 publish received, part 2): See section 3.6 MQTT v3.1.1 protocol specification
433 buffer[0] = MQTTPUBREL | 2; // PUBREL with bit 1 set
434 // bytes 1-3 of PUBREL are the same as of PUBREC
435 if (_client->write(buffer, 4) == 4) {
436 lastOutActivity = millis();
437 }
438 break;
439 case MQTTPUBCOMP:
440 // MQTT Publish Complete (QoS 2 publish received, part 3): See section 3.7 MQTT v3.1.1 protocol specification
441 if (length < 4) {
442 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBCOMP packet with length %zu, expected at least 4 bytes\n", length);
443 return false;
444 }
445 // No futher action here, as resending is not supported.
446 break;
447 case MQTTPINGREQ:
448 // MQTT Ping Request: See section 3.12 MQTT v3.1.1 protocol specification
449 this->buffer[0] = MQTTPINGRESP;
450 this->buffer[1] = 0;
451 if (_client->write(this->buffer, 2) == 2) {
452 lastOutActivity = millis();
453 }
454 break;
455 case MQTTPINGRESP:
456 pingOutstanding = false;
457 break;
458 default:
459 break;
460 }
461 return true;
462}
463
465 if (!connected()) {
466 return false;
467 }
468 bool ret = true;
469 const unsigned long t = millis();
470 if (keepAliveMillis && ((t - lastInActivity > this->keepAliveMillis) || (t - lastOutActivity > this->keepAliveMillis))) {
471 if (pingOutstanding) {
472 DEBUG_PSC_PRINTF("loop aborting due to timeout\n");
474 _client->stop();
475 pingOutstanding = false;
476 return false;
477 } else {
478 this->buffer[0] = MQTTPINGREQ;
479 this->buffer[1] = 0;
480 if (_client->write(this->buffer, 2) == 2) {
481 lastInActivity = lastOutActivity = t;
482 pingOutstanding = true;
483 }
484 }
485 }
486 if (_client->available()) {
487 uint8_t hdrLen;
488 size_t len = readPacket(&hdrLen);
489 if (len > 0) {
490 lastInActivity = t;
491 ret = handlePacket(hdrLen, len);
492 if (!ret) {
493 _state = MQTT_DISCONNECTED;
494 _client->stop();
495 }
496 } else if (!connected()) {
497 // readPacket has closed the connection
498 return false;
499 }
500 }
501 return ret;
502}
503
504bool PubSubClient::publish(const char* topic, const char* payload) {
505 return publish(topic, payload, MQTT_QOS0, false);
506}
507
508bool PubSubClient::publish(const char* topic, const char* payload, bool retained) {
509 return publish(topic, payload, MQTT_QOS0, retained);
510}
511
512bool PubSubClient::publish(const char* topic, const char* payload, uint8_t qos, bool retained) {
513 return publish(topic, (const uint8_t*)payload, payload ? strnlen(payload, MQTT_MAX_POSSIBLE_PACKET_SIZE) : 0, qos, retained);
514}
515
516bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength) {
517 return publish(topic, payload, plength, MQTT_QOS0, false);
518}
519
520bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, bool retained) {
521 return publish(topic, payload, plength, MQTT_QOS0, retained);
522}
523
524bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
525 if (beginPublish(topic, plength, qos, retained)) {
526 size_t rc = write(payload, plength);
527 lastOutActivity = millis();
528 return endPublish() && (rc == plength);
529 }
530 return false;
531}
532
533bool PubSubClient::publish_P(const char* topic, const char* payload, bool retained) {
534 return publish_P(topic, payload, MQTT_QOS0, retained);
535}
536
537bool PubSubClient::publish_P(const char* topic, const char* payload, uint8_t qos, bool retained) {
538 return publish_P(topic, (const uint8_t*)payload, payload ? strnlen_P(payload, MQTT_MAX_POSSIBLE_PACKET_SIZE) : 0, qos, retained);
539}
540
541bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, bool retained) {
542 return publish_P(topic, payload, plength, MQTT_QOS0, retained);
543}
544
545bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
546 if (beginPublish(topic, plength, qos, retained)) {
547 size_t rc = 0;
548 for (size_t i = 0; i < plength; i++) {
549 rc += _client->write((uint8_t)pgm_read_byte_near(payload + i));
550 }
551 lastOutActivity = millis();
552 return endPublish() && (rc == plength);
553 }
554 return false;
555}
556
557bool PubSubClient::beginPublish(const char* topic, size_t plength, bool retained) {
558 return beginPublish(topic, plength, MQTT_QOS0, retained);
559}
560
561bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos, bool retained) {
562 if (!topic) return false;
563 if (strlen(topic) == 0) return false; // empty topic is not allowed
564 if (qos > MQTT_QOS2) { // only valid QoS supported
565 this->_qos = MQTT_QOS0; // reset QoS to 0, that endPublish() will not send a nextMsgId
566 ERROR_PSC_PRINTF_P("beginPublish() called with invalid QoS %u\n", qos);
567 return false;
568 }
569 this->_qos = qos; // save the QoS for later endPublish() operation
570 // check if the header and the topic (including 2 length bytes) fit into the buffer
571 if (connected() && MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 <= this->bufferSize) {
572 // first write the topic at the end of the maximal variable header (MQTT_MAX_HEADER_SIZE) to the buffer
573 size_t topicLen = writeString(topic, this->buffer, MQTT_MAX_HEADER_SIZE, this->bufferSize) - MQTT_MAX_HEADER_SIZE;
574 // we now know the length of the topic string (lenght + 2 bytes signalling the length) and can build the variable header information
575 const uint8_t header = MQTTPUBLISH | MQTT_QOS_GET_HDR(qos) | (retained ? MQTTRETAINED : 0);
576 const size_t nextMsgLen = (qos) ? 2 : 0; // add 2 bytes for the nextMsgId if QoS > 0
577 uint8_t hdrLen = buildHeader(header, this->buffer, topicLen + plength + nextMsgLen);
578 if (hdrLen == 0) return false; // exit here in case of header generation failure
579 // as the header length is variable, it starts at MQTT_MAX_HEADER_SIZE - hdrLen (see buildHeader() documentation)
580 size_t rc = _client->write(this->buffer + (MQTT_MAX_HEADER_SIZE - hdrLen), hdrLen + topicLen);
581 lastOutActivity = millis();
582 return (rc == (hdrLen + topicLen));
583 }
584 return false;
585}
586
588 if (connected()) {
589 if (this->_qos > MQTT_QOS0) {
590 // QoS == 1 or 2, send the msgId
591 uint8_t buf[2];
592 writeNextMsgId(buf, 0, 2);
593 size_t rc = _client->write(buf, 2);
594 lastOutActivity = millis();
595 return (rc == 2);
596 }
597 // QoS == 0, no msgId to send
598 return true;
599 }
600 return false;
601}
602
613uint8_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) {
614 uint8_t hdrBuf[MQTT_MAX_HEADER_SIZE - 1];
615 uint8_t hdrLen = 0;
616 uint8_t digit;
617 size_t len = length;
618 do {
619 digit = len & 0x7F; // digit = len % 128
620 len >>= 7; // len = len / 128
621 if (len > 0) {
622 digit |= 0x80;
623 }
624 hdrBuf[hdrLen++] = digit;
625 } while (len > 0 && hdrLen < MQTT_MAX_HEADER_SIZE - 1);
626
627 if (len > 0) {
628 ERROR_PSC_PRINTF_P("buildHeader() length too big %zu, left %zu\n", length, len);
629 return 0;
630 }
631
632 buf[MQTT_MAX_HEADER_SIZE - 1 - hdrLen] = header;
633 memcpy(buf + MQTT_MAX_HEADER_SIZE - hdrLen, hdrBuf, hdrLen);
634 return hdrLen + 1; // Full header size is variable length bit plus the 1-byte fixed header
635}
636
637size_t PubSubClient::write(uint8_t data) {
638 lastOutActivity = millis();
639 return _client->write(data);
640}
641
642size_t PubSubClient::write(const uint8_t* buffer, size_t size) {
643 lastOutActivity = millis();
644 return _client->write(buffer, size);
645}
646
655bool PubSubClient::write(uint8_t header, uint8_t* buf, size_t length) {
656 bool result = true;
657 size_t rc;
658 uint8_t hdrLen = buildHeader(header, buf, length);
659 if (hdrLen == 0) return false; // exit here in case of header generation failure
660
661#ifdef MQTT_MAX_TRANSFER_SIZE
662 uint8_t* writeBuf = buf + (MQTT_MAX_HEADER_SIZE - hdrLen);
663 size_t bytesRemaining = length + hdrLen; // Match the length type
664 size_t bytesToWrite;
665 while ((bytesRemaining > 0) && result) {
666 yield();
667 bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining;
668 rc = _client->write(writeBuf, bytesToWrite);
669 result = (rc == bytesToWrite);
670 bytesRemaining -= rc;
671 writeBuf += rc;
672 if (result) {
673 lastOutActivity = millis();
674 }
675 }
676#else
677 rc = _client->write(buf + (MQTT_MAX_HEADER_SIZE - hdrLen), length + hdrLen);
678 result = (rc == length + hdrLen);
679 if (result) {
680 lastOutActivity = millis();
681 }
682#endif
683 return result;
684}
685
698size_t PubSubClient::writeString(const char* string, uint8_t* buf, size_t pos, size_t size) {
699 if (!string) return pos;
700
701 size_t sLen = strlen(string);
702 if (pos + 2 + sLen <= size && sLen <= 0xFFFF) {
703 buf[pos++] = (uint8_t)(sLen >> 8);
704 buf[pos++] = (uint8_t)(sLen & 0xFF);
705 memcpy(buf + pos, string, sLen);
706 pos += sLen;
707 } else {
708 ERROR_PSC_PRINTF_P("writeString(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, size);
709 }
710 return pos;
711}
712
722size_t PubSubClient::writeNextMsgId(uint8_t* buf, size_t pos, size_t size) {
723 if (pos + 2 <= size) {
724 nextMsgId = (++nextMsgId == 0) ? 1 : nextMsgId; // increment msgId (must not be 0, so start at 1)
725 buf[pos++] = (uint8_t)(nextMsgId >> 8);
726 buf[pos++] = (uint8_t)(nextMsgId & 0xFF);
727 } else {
728 ERROR_PSC_PRINTF_P("writeNextMsgId(): buffer (%zu) does not fit into buf (%zu)\n", pos + 2, size);
729 }
730 return pos;
731}
732
733bool PubSubClient::subscribe(const char* topic) {
734 return subscribe(topic, MQTT_QOS0);
735}
736
737bool PubSubClient::subscribe(const char* topic, uint8_t qos) {
738 if (!topic) return false;
739 if (qos > MQTT_QOS1) return false; // only QoS 0 and 1 supported
740
741 size_t topicLen = strnlen(topic, this->bufferSize);
742 if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen + 1) {
743 // Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen + QoS (1)
744 return false;
745 }
746 if (connected()) {
747 // Leave room in the buffer for header and variable length field
748 uint16_t length = MQTT_MAX_HEADER_SIZE;
749 length = writeNextMsgId(buffer, length, this->bufferSize); // buffer size is checked before
750 length = writeString(topic, this->buffer, length, this->bufferSize);
751 this->buffer[length++] = qos;
752 return write(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), this->buffer, length - MQTT_MAX_HEADER_SIZE);
753 }
754 return false;
755}
756
757bool PubSubClient::unsubscribe(const char* topic) {
758 if (!topic) return false;
759
760 size_t topicLen = strnlen(topic, this->bufferSize);
761 if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen) {
762 // Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen
763 return false;
764 }
765 if (connected()) {
766 uint16_t length = MQTT_MAX_HEADER_SIZE;
767 length = writeNextMsgId(buffer, length, this->bufferSize); // buffer size is checked before
768 length = writeString(topic, this->buffer, length, this->bufferSize);
769 return write(MQTTUNSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), this->buffer, length - MQTT_MAX_HEADER_SIZE);
770 }
771 return false;
772}
773
774PubSubClient& PubSubClient::setServer(uint8_t* ip, uint16_t port) {
775 IPAddress addr(ip[0], ip[1], ip[2], ip[3]);
776 return setServer(addr, port);
777}
778
779PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
780 this->ip = ip;
781 this->port = port;
782 free(this->domain);
783 this->domain = nullptr;
784 return *this;
785}
786
787PubSubClient& PubSubClient::setServer(const char* domain, uint16_t port) {
788 char* newDomain = nullptr;
789 if (domain) {
790 newDomain = (char*)realloc(this->domain, strlen(domain) + 1);
791 }
792 if (newDomain) {
793 strcpy(newDomain, domain);
794 this->domain = newDomain;
795 this->port = port;
796 } else {
797 free(this->domain);
798 this->domain = nullptr;
799 this->port = 0;
800 }
801 return *this;
802}
803
804PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
805 this->callback = callback;
806 return *this;
807}
808
810 this->_client = &client;
811 return *this;
812}
813
815 this->stream = &stream;
816 return *this;
817}
818
820 if (size == 0) {
821 // Cannot set it back to 0
822 return false;
823 }
824 if (this->bufferSize == 0) {
825 this->buffer = (uint8_t*)malloc(size);
826 } else {
827 uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
828 if (newBuffer) {
829 this->buffer = newBuffer;
830 } else {
831 return false;
832 }
833 }
834 this->bufferSize = size;
835 return (this->buffer != nullptr);
836}
837
839 return this->bufferSize;
840}
841
843 this->keepAliveMillis = keepAlive * 1000UL;
844 return *this;
845}
846
848 this->socketTimeoutMillis = timeout * 1000UL;
849 return *this;
850}
851
853 return this->_state;
854}
A simple client for MQTT.
#define MQTT_SOCKET_TIMEOUT
Sets the timeout, in seconds, when reading from the network. This also applies as the timeout for cal...
#define MQTT_MAX_TRANSFER_SIZE
Sets the maximum number of bytes passed to the network client in each write call. Some hardware has a...
#define MQTT_VERSION
Sets the version of the MQTT protocol to use (3.1 or 3.1.1). [MQTT_VERSION_3_1, MQTT_VERSION_3_1_1].
#define MQTT_MAX_POSSIBLE_PACKET_SIZE
Maximum packet size defined by MQTT protocol.
#define MQTT_MAX_PACKET_SIZE
Sets the largest packet size, in bytes, the client will handle. Any packet received that exceeds this...
#define MQTT_KEEPALIVE
Sets the keepalive interval, in seconds, the client will use. This is used to maintain the connection...
bool loop()
This should be called regularly to allow the client to process incoming messages and maintain its con...
bool subscribe(const char *topic)
Subscribes to messages published to the specified topic using QoS 0.
PubSubClient & setCallback(MQTT_CALLBACK_SIGNATURE)
Sets the message callback function.
PubSubClient & setServer(IPAddress ip, uint16_t port)
Sets the server details.
bool beginPublish(const char *topic, size_t plength, bool retained)
Start to publish a message using QoS 0. This API: beginPublish(...) one or more calls to write(....
bool unsubscribe(const char *topic)
Unsubscribes from the specified topic.
bool publish_P(const char *topic, const char *payload, bool retained)
Publishes a message stored in PROGMEM to the specified topic using QoS 0.
bool publish(const char *topic, const char *payload)
Publishes a non retained message to the specified topic using QoS 0.
~PubSubClient()
Destructor for the PubSubClient class.
PubSubClient & setSocketTimeout(uint16_t timeout)
Sets the socket timeout used by the client. This determines how long the client will wait for incomin...
PubSubClient()
Creates an uninitialised client instance.
PubSubClient & setKeepAlive(uint16_t keepAlive)
Sets the keep alive interval used by the client. This value should only be changed when the client is...
bool publish(const char *topic, const char *payload, bool retained)
Publishes a message to the specified topic using QoS 0.
bool connected()
Checks whether the client is connected to the server.
int state()
Returns the current state of the client. If a connection attempt fails, this can be used to get more ...
bool endPublish()
Finish sending a message that was started with a call to beginPublish.
PubSubClient & setClient(Client &client)
Sets the network client instance to use.
void disconnect()
Disconnects the client.
size_t getBufferSize()
Gets the current size of the internal buffer.
bool setBufferSize(size_t size)
Sets the size, in bytes, of the internal send and receive buffer. This must be large enough to contai...
bool connect(const char *id)
Connects the client using a clean session without username and password.
PubSubClient & setStream(Stream &stream)
Sets the stream to write received messages to.
#define MQTT_QOS0
Quality of Service 0: At most once.
#define MQTT_QOS2
#define MQTT_QOS1
Quality of Service 1: At least once.
#define MQTT_CONNECTION_TIMEOUT
The network connection timed out or server didn't respond within the keepalive time.
#define MQTT_CONNECTED
The client is connected.
#define MQTT_CONNECT_FAILED
The network connection failed.
#define MQTT_CONNECTION_LOST
The network connection was lost/broken.
#define MQTT_DISCONNECTED
The client is disconnected cleanly.