PubSubClient3 v3.2.1
Located at <a href='https://github.com/hmueller01/pubsubclient3'>GitHub</a>
 
Loading...
Searching...
No Matches
PubSubClient.cpp
Go to the documentation of this file.
1
10
11#include "PubSubClient.h"
12
20#define CHECK_STRING_LENGTH(l, s) \
21 if ((!s) || (l + 2 + strnlen(s, _bufferSize) > _bufferSize)) { \
22 _client->stop(); \
23 return false; \
24 }
25
31
33 setClient(client);
34}
35
36PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) : PubSubClient() {
37 setServer(addr, port);
38 setClient(client);
39}
40
41PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
42 setServer(addr, port);
43 setClient(client);
44 setStream(stream);
45}
46
47PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
48 setServer(addr, port);
49 setCallback(callback);
50 setClient(client);
51}
52
53PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
54 setServer(addr, port);
55 setCallback(callback);
56 setClient(client);
57 setStream(stream);
58}
59
60PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, Client& client) : PubSubClient() {
61 setServer(ip, port);
62 setClient(client);
63}
64
65PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
66 setServer(ip, port);
67 setClient(client);
68 setStream(stream);
69}
70
71PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
72 setServer(ip, port);
73 setCallback(callback);
74 setClient(client);
75}
76
77PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
78 setServer(ip, port);
79 setCallback(callback);
80 setClient(client);
81 setStream(stream);
82}
83
84PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) : PubSubClient() {
85 setServer(domain, port);
86 setClient(client);
87}
88
89PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
90 setServer(domain, port);
91 setClient(client);
92 setStream(stream);
93}
94
95PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
96 setServer(domain, port);
97 setCallback(callback);
98 setClient(client);
99}
100
101PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
102 setServer(domain, port);
103 setCallback(callback);
104 setClient(client);
105 setStream(stream);
106}
107
109 free(_domain);
110 free(_buffer);
111}
112
113bool PubSubClient::connect(const char* id) {
114 return connect(id, nullptr, nullptr, nullptr, MQTT_QOS0, false, nullptr, true);
115}
116
117bool PubSubClient::connect(const char* id, const char* user, const char* pass) {
118 return connect(id, user, pass, nullptr, MQTT_QOS0, false, nullptr, true);
119}
120
121bool PubSubClient::connect(const char* id, const char* willTopic, uint8_t willQos, bool willRetain, const char* willMessage) {
122 return connect(id, nullptr, nullptr, willTopic, willQos, willRetain, willMessage, true);
123}
124
125bool PubSubClient::connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, bool willRetain,
126 const char* willMessage) {
127 return connect(id, user, pass, willTopic, willQos, willRetain, willMessage, true);
128}
129
130bool PubSubClient::connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, bool willRetain,
131 const char* willMessage, bool cleanSession) {
132 if (!_client) return false; // do not crash if client not set
133 if (!connected()) {
134 int result = 0;
135
136 if (_client->connected()) {
137 result = 1;
138 } else if (_port != 0) {
139 if (_domain) {
140 result = _client->connect(_domain, _port);
141 } else {
142 result = _client->connect(_ip, _port);
143 }
144 }
145
146 if (result == 1) {
147 _nextMsgId = 1; // init msgId (packet identifier)
148
149#if MQTT_VERSION == MQTT_VERSION_3_1
150 const uint8_t protocol[9] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTT_VERSION};
151#elif MQTT_VERSION == MQTT_VERSION_3_1_1
152 const uint8_t protocol[7] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION};
153#endif
154 // Leave room in the _buffer for header and variable length field
155 memcpy(_buffer + MQTT_MAX_HEADER_SIZE, protocol, sizeof(protocol));
156
157 size_t length = MQTT_MAX_HEADER_SIZE + sizeof(protocol);
158 uint8_t flags = 0x00;
159 if (willTopic) {
160 flags = (0x01 << 2) | (willQos << 3) | (willRetain << 5); // set will flag bit 2, will QoS and will retain bit 5
161 }
162 if (cleanSession) {
163 flags = flags | (0x01 << 1); // set clean session bit 1
164 }
165 if (user) {
166 flags = flags | (0x01 << 7); // set user name flag bit 7
167 if (pass) {
168 flags = flags | (0x01 << 6); // set password flag bit 6
169 }
170 }
171 const uint16_t keepAlive = _keepAliveMillis / 1000;
172 _buffer[length++] = flags;
173 _buffer[length++] = keepAlive >> 8;
174 _buffer[length++] = keepAlive & 0xFF;
175
176 CHECK_STRING_LENGTH(length, id)
177 length = writeString(id, length);
178 if (willTopic) {
179 CHECK_STRING_LENGTH(length, willTopic)
180 length = writeString(willTopic, length);
181 CHECK_STRING_LENGTH(length, willMessage)
182 length = writeString(willMessage, length);
183 }
184
185 if (user) {
186 CHECK_STRING_LENGTH(length, user)
187 length = writeString(user, length);
188 if (pass) {
189 CHECK_STRING_LENGTH(length, pass)
190 length = writeString(pass, length);
191 }
192 }
193
194 if (!writeControlPacket(MQTTCONNECT, length - MQTT_MAX_HEADER_SIZE)) {
195 _state = MQTT_CONNECT_FAILED;
196 _client->stop();
197 return false;
198 }
199 _lastInActivity = _lastOutActivity = millis();
200 _pingOutstanding = false;
201
202 while (!_client->available()) {
203 yield();
204 unsigned long t = millis();
205 if (t - _lastInActivity >= _socketTimeoutMillis) {
206 DEBUG_PSC_PRINTF("connect aborting due to timeout\n");
208 _client->stop();
209 return false;
210 }
211 }
212 uint8_t hdrLen;
213 size_t len = readPacket(&hdrLen);
214
215 if (len == 4) {
216 if (_buffer[3] == 0) {
217 _lastInActivity = millis();
218 _state = MQTT_CONNECTED;
219 return true;
220 } else {
221 _state = _buffer[3];
222 }
223 }
224 DEBUG_PSC_PRINTF("connect aborting due to protocol error\n");
225 _client->stop();
226 } else {
227 _state = MQTT_CONNECT_FAILED;
228 }
229 return false;
230 }
231 return true;
232}
233
235 if (!_client) return false;
236
237 if (_client->connected()) {
238 return (_state == MQTT_CONNECTED);
239 } else if (_state == MQTT_CONNECTED) {
240 DEBUG_PSC_PRINTF("lost connection (client may have more details)\n");
241 _state = MQTT_CONNECTION_LOST;
242 _client->stop();
243 _pingOutstanding = false;
244 }
245 return false;
246}
247
249 DEBUG_PSC_PRINTF("disconnect called\n");
250 _state = MQTT_DISCONNECTED;
251 if (_client) {
252 _buffer[0] = MQTTDISCONNECT;
253 _buffer[1] = 0;
254 _client->write(_buffer, 2);
255 _client->flush();
256 _client->stop();
257 _lastInActivity = _lastOutActivity = millis();
258 }
259 _pingOutstanding = false;
260}
261
268bool PubSubClient::readByte(uint8_t* result) {
269 if (!_client) return false; // do not crash if client not set
270
271 unsigned long previousMillis = millis();
272 while (!_client->available()) {
273 yield();
274 unsigned long currentMillis = millis();
275 if (currentMillis - previousMillis >= _socketTimeoutMillis) {
276 return false;
277 }
278 }
279 int rc = _client->read();
280 if (rc < 0) {
281 return false;
282 }
283 *result = (uint8_t)rc;
284 return true;
285}
286
293bool PubSubClient::readByte(uint8_t* result, size_t* pos) {
294 uint8_t* write_address = &(result[*pos]);
295 if (readByte(write_address)) {
296 (*pos)++;
297 return true;
298 }
299 return false;
300}
301
308size_t PubSubClient::readPacket(uint8_t* hdrLen) {
309 size_t len = 0;
310 if (!readByte(_buffer, &len)) return 0;
311 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
312 uint32_t multiplier = 1;
313 size_t length = 0;
314 uint8_t digit = 0;
315 uint16_t skip = 0;
316 uint8_t start = 0;
317
318 do {
319 if (len == MQTT_MAX_HEADER_SIZE) {
320 // Invalid remaining length encoding - kill the connection
321 DEBUG_PSC_PRINTF("readPacket detected packet of invalid length\n");
322 _state = MQTT_DISCONNECTED;
323 _client->stop();
324 return 0;
325 }
326 if (!readByte(&digit)) return 0;
327 _buffer[len++] = digit;
328 length += (digit & 0x7F) * multiplier; // length is coded in the lower 7 bits
329 multiplier <<= 7; // multiplier *= 128
330 } while ((digit & 0x80) != 0); // do while 8th continuation bit is set
331 *hdrLen = (uint8_t)(len - 1);
332
333 DEBUG_PSC_PRINTF("readPacket received packet of length %zu (isPublish = %u)\n", length, isPublish);
334
335 if (isPublish) {
336 // Read in topic length to calculate bytes to skip over for Stream writing
337 if (!readByte(_buffer, &len)) return 0;
338 if (!readByte(_buffer, &len)) return 0;
339 skip = (_buffer[*hdrLen + 1] << 8) + _buffer[*hdrLen + 2];
340 start = 2;
341 if (MQTT_HDR_GET_QOS(_buffer[0]) > MQTT_QOS0) {
342 // skip msgId (packet identifier) for QoS 1 and 2 messages
343 skip += 2;
344 }
345 }
346 size_t idx = len;
347
348 for (size_t i = start; i < length; i++) {
349 if (!readByte(&digit)) return 0;
350 if (_stream) {
351 if (isPublish && (idx - *hdrLen - 2 > skip)) {
352 _stream->write(digit);
353 }
354 }
355
356 if (len < _bufferSize) {
357 _buffer[len++] = digit;
358 }
359 idx++;
360 }
361
362 if (!_stream && (idx > _bufferSize)) {
363 DEBUG_PSC_PRINTF("readPacket ignoring packet of size %zu exceeding buffer of size %zu\n", length, _bufferSize);
364 len = 0; // This will cause the packet to be ignored.
365 }
366 return len;
367}
368
376bool PubSubClient::handlePacket(uint8_t hdrLen, size_t length) {
377 uint8_t type = _buffer[0] & 0xF0;
378 DEBUG_PSC_PRINTF("received message of type %u\n", type);
379 switch (type) {
380 case MQTTPUBLISH:
381 if (callback) {
382 // MQTT Publish packet: See section 3.3 MQTT v3.1.1 protocol specification:
383 // - Header: 1 byte
384 // - Remaining header length: hdrLen bytes, multibyte field (1 .. MQTT_MAX_HEADER_SIZE - 1)
385 // - Topic length: 2 bytes (starts at _buffer[hdrLen + 1])
386 // - Topic: topicLen bytes (starts at _buffer[hdrLen + 3])
387 // - Packet Identifier (msgId): 0 bytes for QoS 0, 2 bytes for QoS 1 and 2 (starts at _buffer[hdrLen + 3 + topicLen])
388 // - Payload (for QoS = 0): length - (hdrLen + 3 + topicLen) bytes (starts at _buffer[hdrLen + 3 + topicLen])
389 // - Payload (for QoS > 0): length - (hdrLen + 5 + topicLen) bytes (starts at _buffer[hdrLen + 5 + topicLen])
390 // To get a null reminated 'C' topic string we move the topic 1 byte to the front (overwriting the LSB of the topic lenght)
391 uint16_t topicLen = (_buffer[hdrLen + 1] << 8) + _buffer[hdrLen + 2]; // topic length in bytes
392 char* topic = (char*)(_buffer + hdrLen + 3 - 1); // set the topic in the LSB of the topic lenght, as we move it there
393 uint16_t payloadOffset = hdrLen + 3 + topicLen; // payload starts after header and topic (if there is no packet identifier)
394 size_t payloadLen = length - payloadOffset; // this might change by 2 if we have a QoS 1 or 2 message
395 uint8_t* payload = _buffer + payloadOffset;
396
397 if (length < payloadOffset) { // do not move outside the max bufferSize
398 ERROR_PSC_PRINTF_P("handlePacket(): Suspicious topicLen (%u) points outside of received buffer length (%zu)\n", topicLen, length);
399 return false;
400 }
401 memmove(topic, topic + 1, topicLen); // move topic inside buffer 1 byte to front
402 topic[topicLen] = '\0'; // end the topic as a 'C' string with \x00
403
404 if (MQTT_HDR_GET_QOS(_buffer[0]) == MQTT_QOS0) {
405 // No msgId for QOS == 0
406 callback(topic, payload, payloadLen);
407 } else {
408 // For QOS 1 and 2 we have a msgId (packet identifier) after the topic at the current payloadOffset
409 if (payloadLen < 2) { // payload must be >= 2, as we have the msgId before
410 ERROR_PSC_PRINTF_P("handlePacket(): Missing msgId in QoS 1/2 message\n");
411 return false;
412 }
413 uint16_t msgId = (_buffer[payloadOffset] << 8) + _buffer[payloadOffset + 1];
414 callback(topic, payload + 2, payloadLen - 2); // remove the msgId from the callback payload
415
416 _buffer[0] = MQTTPUBACK;
417 _buffer[1] = 2;
418 _buffer[2] = (msgId >> 8);
419 _buffer[3] = (msgId & 0xFF);
420 if (_client->write(_buffer, 4) == 4) {
421 _lastOutActivity = millis();
422 }
423 }
424 }
425 break;
426 case MQTTPUBACK:
427 // MQTT Publish Acknowledgment (QoS 1 publish received): See section 3.4 MQTT v3.1.1 protocol specification
428 if (length < 4) {
429 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBACK packet with length %zu, expected at least 4 bytes\n", length);
430 return false;
431 }
432 // No futher action here, as resending is not supported.
433 break;
434 case MQTTPUBREC:
435 // MQTT Publish Received (QoS 2 publish received, part 1): See section 3.5 MQTT v3.1.1 protocol specification
436 if (length < 4) {
437 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBREC packet with length %zu, expected at least 4 bytes\n", length);
438 return false;
439 }
440 // MQTT Publish Release (QoS 2 publish received, part 2): See section 3.6 MQTT v3.1.1 protocol specification
441 _buffer[0] = MQTTPUBREL | 2; // PUBREL with bit 1 set
442 // bytes 1-3 of PUBREL are the same as of PUBREC
443 if (_client->write(_buffer, 4) == 4) {
444 _lastOutActivity = millis();
445 }
446 break;
447 case MQTTPUBCOMP:
448 // MQTT Publish Complete (QoS 2 publish received, part 3): See section 3.7 MQTT v3.1.1 protocol specification
449 if (length < 4) {
450 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBCOMP packet with length %zu, expected at least 4 bytes\n", length);
451 return false;
452 }
453 // No futher action here, as resending is not supported.
454 break;
455 case MQTTPINGREQ:
456 // MQTT Ping Request: See section 3.12 MQTT v3.1.1 protocol specification
457 _buffer[0] = MQTTPINGRESP;
458 _buffer[1] = 0;
459 if (_client->write(_buffer, 2) == 2) {
460 _lastOutActivity = millis();
461 }
462 break;
463 case MQTTPINGRESP:
464 _pingOutstanding = false;
465 break;
466 default:
467 break;
468 }
469 return true;
470}
471
473 if (!connected()) {
474 return false;
475 }
476 bool ret = true;
477 const unsigned long t = millis();
478 if (_keepAliveMillis && ((t - _lastInActivity > _keepAliveMillis) || (t - _lastOutActivity > _keepAliveMillis))) {
479 if (_pingOutstanding) {
480 DEBUG_PSC_PRINTF("loop aborting due to timeout\n");
482 _client->stop();
483 _pingOutstanding = false;
484 return false;
485 } else if (_bufferWritePos > 0) {
486 // There is still data in the _buffer to be sent, so send it now instead of a ping
487 if (flushBuffer() == 0) {
489 _client->stop();
490 _pingOutstanding = false;
491 return false;
492 }
493 } else {
494 _buffer[0] = MQTTPINGREQ;
495 _buffer[1] = 0;
496 if (_client->write(_buffer, 2) == 2) {
497 _lastInActivity = _lastOutActivity = t;
498 _pingOutstanding = true;
499 }
500 }
501 }
502 if (_client->available()) {
503 uint8_t hdrLen;
504 size_t len = readPacket(&hdrLen);
505 if (len > 0) {
506 _lastInActivity = t;
507 ret = handlePacket(hdrLen, len);
508 if (!ret) {
509 _state = MQTT_DISCONNECTED;
510 _client->stop();
511 }
512 } else if (!connected()) {
513 // readPacket has closed the connection
514 return false;
515 }
516 }
517 return ret;
518}
519
520bool PubSubClient::publish(const char* topic, const char* payload) {
521 return publish(topic, payload, MQTT_QOS0, false);
522}
523
524bool PubSubClient::publish(const char* topic, const char* payload, bool retained) {
525 return publish(topic, payload, MQTT_QOS0, retained);
526}
527
528bool PubSubClient::publish(const char* topic, const char* payload, uint8_t qos, bool retained) {
529 return publish(topic, (const uint8_t*)payload, payload ? strnlen(payload, MQTT_MAX_POSSIBLE_PACKET_SIZE) : 0, qos, retained);
530}
531
532bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength) {
533 return publish(topic, payload, plength, MQTT_QOS0, false);
534}
535
536bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, bool retained) {
537 return publish(topic, payload, plength, MQTT_QOS0, retained);
538}
539
540bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
541 if (beginPublish(topic, plength, qos, retained)) {
542 size_t rc = write(payload, plength);
543 return endPublish() && (rc == plength);
544 }
545 return false;
546}
547
548bool PubSubClient::publish_P(const char* topic, const char* payload, bool retained) {
549 return publish_P(topic, payload, MQTT_QOS0, retained);
550}
551
552bool PubSubClient::publish_P(const char* topic, const char* payload, uint8_t qos, bool retained) {
553 return publish_P(topic, (const uint8_t*)payload, payload ? strnlen_P(payload, MQTT_MAX_POSSIBLE_PACKET_SIZE) : 0, qos, retained);
554}
555
556bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, bool retained) {
557 return publish_P(topic, payload, plength, MQTT_QOS0, retained);
558}
559
560bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
561 if (beginPublish(topic, plength, qos, retained)) {
562 size_t rc = write_P(payload, plength);
563 return endPublish() && (rc == plength);
564 }
565 return false;
566}
567
568bool PubSubClient::beginPublish(const char* topic, size_t plength, bool retained) {
569 return beginPublish(topic, plength, MQTT_QOS0, retained);
570}
571
572bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos, bool retained) {
573 if (!topic) return false;
574 if (strlen(topic) == 0) return false; // empty topic is not allowed
575 if (qos > MQTT_QOS2) { // only valid QoS supported
576 ERROR_PSC_PRINTF_P("beginPublish() called with invalid QoS %u\n", qos);
577 return false;
578 }
579 const size_t nextMsgLen = (qos > MQTT_QOS0) ? 2 : 0; // add 2 bytes for nextMsgId if QoS > 0
580 // check if the header, the topic (including 2 length bytes) and nextMsgId fit into the _buffer
581 if (connected() && (MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 + nextMsgLen <= _bufferSize)) {
582 // first write the topic at the end of the maximal variable header (MQTT_MAX_HEADER_SIZE) to the _buffer
583 size_t topicLen = writeString(topic, MQTT_MAX_HEADER_SIZE) - MQTT_MAX_HEADER_SIZE;
584 if (qos > MQTT_QOS0) {
585 // if QoS 1 or 2, we need to send the nextMsgId (packet identifier) after topic
586 writeNextMsgId(MQTT_MAX_HEADER_SIZE + topicLen);
587 }
588 // we now know the length of the topic string (lenght + 2 bytes signalling the length) and can build the variable header information
589 const uint8_t header = MQTTPUBLISH | MQTT_QOS_GET_HDR(qos) | (retained ? MQTTRETAINED : 0);
590 uint8_t hdrLen = buildHeader(header, topicLen + nextMsgLen + plength);
591 if (hdrLen == 0) return false; // exit here in case of header generation failure
592 // as the header length is variable, it starts at MQTT_MAX_HEADER_SIZE - hdrLen (see buildHeader() documentation)
593 size_t rc = _client->write(_buffer + (MQTT_MAX_HEADER_SIZE - hdrLen), hdrLen + topicLen + nextMsgLen);
594 _lastOutActivity = millis();
595 return (rc == (hdrLen + topicLen + nextMsgLen));
596 }
597 return false;
598}
599
601 if (connected()) {
602 if (_bufferWritePos > 0) {
603 // still data in the _buffer to be sent
604 if (flushBuffer() == 0) return false;
605 }
606 return true;
607 }
608 return false;
609}
610
620uint8_t PubSubClient::buildHeader(uint8_t header, size_t length) {
621 uint8_t hdrBuf[MQTT_MAX_HEADER_SIZE - 1];
622 uint8_t hdrLen = 0;
623 uint8_t digit;
624 size_t len = length;
625 do {
626 digit = len & 0x7F; // digit = len % 128
627 len >>= 7; // len = len / 128
628 if (len > 0) {
629 digit |= 0x80;
630 }
631 hdrBuf[hdrLen++] = digit;
632 } while ((len > 0) && (hdrLen < MQTT_MAX_HEADER_SIZE - 1));
633
634 if (len > 0) {
635 ERROR_PSC_PRINTF_P("buildHeader() length too big %zu, left %zu\n", length, len);
636 return 0;
637 }
638
639 _buffer[MQTT_MAX_HEADER_SIZE - 1 - hdrLen] = header;
640 memcpy(_buffer + MQTT_MAX_HEADER_SIZE - hdrLen, hdrBuf, hdrLen);
641 return hdrLen + 1; // Full header size is variable length bit plus the 1-byte fixed header
642}
643
644size_t PubSubClient::write(uint8_t data) {
645 return appendBuffer(data);
646}
647
648size_t PubSubClient::write(const uint8_t* buf, size_t size) {
649 for (size_t i = 0; i < size; i++) {
650 if (appendBuffer(buf[i]) == 0) return i;
651 }
652 return size;
653}
654
655size_t PubSubClient::write_P(const uint8_t* buf, size_t size) {
656 for (size_t i = 0; i < size; i++) {
657 if (appendBuffer((uint8_t)pgm_read_byte_near(buf + i)) == 0) return i;
658 }
659 return size;
660}
661
670bool PubSubClient::writeControlPacket(uint8_t header, size_t length) {
671 uint8_t hdrLen = buildHeader(header, length);
672 if (hdrLen == 0) return false; // exit here in case of header generation failure
673
674 return writeBuffer(MQTT_MAX_HEADER_SIZE - hdrLen, hdrLen + length);
675}
676
684size_t PubSubClient::writeBuffer(size_t pos, size_t size) {
685 size_t rc = 0;
686 if (_client && (size > 0) && (pos + size <= _bufferSize)) {
687#ifdef MQTT_MAX_TRANSFER_SIZE
688 uint8_t* writeBuf = _buffer + pos;
689 size_t bytesRemaining = size;
690 bool result = true;
691 while ((bytesRemaining > 0) && result) {
692 size_t bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining;
693 size_t bytesWritten = _client->write(writeBuf, bytesToWrite);
694 result = (bytesWritten == bytesToWrite);
695 bytesRemaining -= bytesWritten;
696 writeBuf += bytesWritten;
697 if (result) {
698 _lastOutActivity = millis();
699 }
700 yield();
701 }
702 rc = result ? size : 0; // if result is false indicate a write error
703#else
704 rc = _client->write(_buffer + pos, size);
705 if (rc == size) {
706 _lastOutActivity = millis();
707 } else {
708 rc = 0; // indicate a write error
709 }
710#endif
711 }
712 return rc;
713}
714
725size_t PubSubClient::writeString(const char* string, size_t pos) {
726 if (!string) return pos;
727
728 size_t sLen = strlen(string);
729 if ((pos + 2 + sLen <= _bufferSize) && (sLen <= 0xFFFF)) {
730 _buffer[pos++] = (uint8_t)(sLen >> 8);
731 _buffer[pos++] = (uint8_t)(sLen & 0xFF);
732 memcpy(_buffer + pos, string, sLen);
733 pos += sLen;
734 } else {
735 ERROR_PSC_PRINTF_P("writeString(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, _bufferSize);
736 }
737 return pos;
738}
739
747size_t PubSubClient::writeNextMsgId(size_t pos) {
748 if ((pos + 2) <= _bufferSize) {
749 _nextMsgId = (++_nextMsgId == 0) ? 1 : _nextMsgId; // increment msgId (must not be 0, so start at 1)
750 _buffer[pos++] = (uint8_t)(_nextMsgId >> 8);
751 _buffer[pos++] = (uint8_t)(_nextMsgId & 0xFF);
752 } else {
753 ERROR_PSC_PRINTF_P("writeNextMsgId(): buffer overrun (%zu) \n", pos + 2);
754 }
755 return pos;
756}
757
764size_t PubSubClient::appendBuffer(uint8_t data) {
765 _buffer[_bufferWritePos++] = data;
766 if (_bufferWritePos >= _bufferSize) {
767 if (flushBuffer() == 0) return 0;
768 }
769 return 1;
770}
771
778size_t PubSubClient::flushBuffer() {
779 size_t rc = 0;
780 if (connected()) {
781 rc = writeBuffer(0, _bufferWritePos);
782 }
783 _bufferWritePos = 0;
784 return rc;
785}
786
787bool PubSubClient::subscribe(const char* topic) {
788 return subscribe(topic, MQTT_QOS0);
789}
790
791bool PubSubClient::subscribe(const char* topic, uint8_t qos) {
792 if (!topic) return false;
793 if (qos > MQTT_QOS1) return false; // only QoS 0 and 1 supported
794
795 size_t topicLen = strnlen(topic, _bufferSize);
796 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen + 1) {
797 // Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen + QoS (1)
798 return false;
799 }
800 if (connected()) {
801 // Leave room in the _buffer for header and variable length field
802 uint16_t length = MQTT_MAX_HEADER_SIZE;
803 length = writeNextMsgId(length); // _buffer size is checked before
804 length = writeString(topic, length);
805 _buffer[length++] = qos;
806 return writeControlPacket(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
807 }
808 return false;
809}
810
811bool PubSubClient::unsubscribe(const char* topic) {
812 if (!topic) return false;
813
814 size_t topicLen = strnlen(topic, _bufferSize);
815 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen) {
816 // Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen
817 return false;
818 }
819 if (connected()) {
820 uint16_t length = MQTT_MAX_HEADER_SIZE;
821 length = writeNextMsgId(length); // _buffer size is checked before
822 length = writeString(topic, length);
823 return writeControlPacket(MQTTUNSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
824 }
825 return false;
826}
827
828PubSubClient& PubSubClient::setServer(uint8_t* ip, uint16_t port) {
829 IPAddress addr(ip[0], ip[1], ip[2], ip[3]);
830 return setServer(addr, port);
831}
832
833PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
834 _ip = ip;
835 _port = port;
836 free(_domain);
837 _domain = nullptr;
838 return *this;
839}
840
841PubSubClient& PubSubClient::setServer(const char* domain, uint16_t port) {
842 char* newDomain = nullptr;
843 if (domain) {
844 newDomain = (char*)realloc(_domain, strlen(domain) + 1);
845 }
846 if (newDomain) {
847 strcpy(newDomain, domain);
848 _domain = newDomain;
849 _port = port;
850 } else {
851 free(_domain);
852 _domain = nullptr;
853 _port = 0;
854 }
855 return *this;
856}
857
858PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
859 this->callback = callback;
860 return *this;
861}
862
864 _client = &client;
865 return *this;
866}
867
869 _stream = &stream;
870 return *this;
871}
872
874 if (size == 0) {
875 // Cannot set it back to 0
876 return false;
877 }
878 if (_bufferSize == 0) {
879 _buffer = (uint8_t*)malloc(size);
880 } else {
881 uint8_t* newBuffer = (uint8_t*)realloc(_buffer, size);
882 if (newBuffer) {
883 _buffer = newBuffer;
884 } else {
885 return false;
886 }
887 }
888 _bufferSize = size;
889 return (_buffer != nullptr);
890}
891
893 return _bufferSize;
894}
895
897 _keepAliveMillis = keepAlive * 1000UL;
898 return *this;
899}
900
902 _socketTimeoutMillis = timeout * 1000UL;
903 return *this;
904}
905
907 return _state;
908}
A simple client for MQTT.
#define MQTT_SOCKET_TIMEOUT
Sets the timeout, in seconds, when reading from the network. This also applies as the timeout for cal...
#define MQTT_MAX_TRANSFER_SIZE
Sets the maximum number of bytes passed to the network client in each write call. Some hardware has a...
#define MQTT_VERSION
Sets the version of the MQTT protocol to use (3.1 or 3.1.1). [MQTT_VERSION_3_1, MQTT_VERSION_3_1_1].
#define MQTT_MAX_POSSIBLE_PACKET_SIZE
Maximum packet size defined by MQTT protocol.
#define MQTT_MAX_PACKET_SIZE
Sets the largest packet size, in bytes, the client will handle. Any packet received that exceeds this...
#define MQTT_KEEPALIVE
Sets the keepalive interval, in seconds, the client will use. This is used to maintain the connection...
bool loop()
This should be called regularly to allow the client to process incoming messages and maintain its con...
bool subscribe(const char *topic)
Subscribes to messages published to the specified topic using QoS 0.
PubSubClient & setCallback(MQTT_CALLBACK_SIGNATURE)
Sets the message callback function.
PubSubClient & setServer(IPAddress ip, uint16_t port)
Sets the server details.
bool beginPublish(const char *topic, size_t plength, bool retained)
Start to publish a message using QoS 0. This API: beginPublish(...) one or more calls to write(....
bool unsubscribe(const char *topic)
Unsubscribes from the specified topic.
bool publish_P(const char *topic, const char *payload, bool retained)
Publishes a message stored in PROGMEM to the specified topic using QoS 0.
virtual size_t write(uint8_t data)
Writes a single byte as a component of a publish started with a call to beginPublish....
bool publish(const char *topic, const char *payload)
Publishes a non retained message to the specified topic using QoS 0.
~PubSubClient()
Destructor for the PubSubClient class.
PubSubClient & setSocketTimeout(uint16_t timeout)
Sets the socket timeout used by the client. This determines how long the client will wait for incomin...
PubSubClient()
Creates an uninitialised client instance.
PubSubClient & setKeepAlive(uint16_t keepAlive)
Sets the keep alive interval used by the client. This value should only be changed when the client is...
bool connected()
Checks whether the client is connected to the server.
int state()
Returns the current state of the client. If a connection attempt fails, this can be used to get more ...
bool endPublish()
Finish sending a message that was started with a call to beginPublish.
PubSubClient & setClient(Client &client)
Sets the network client instance to use.
void disconnect()
Disconnects the client.
size_t getBufferSize()
Gets the current size of the internal buffer.
size_t write_P(const uint8_t *buf, size_t size)
Writes an array of progmem bytes as a component of a publish started with a call to beginPublish....
bool setBufferSize(size_t size)
Sets the size, in bytes, of the internal send and receive buffer. This must be large enough to contai...
bool connect(const char *id)
Connects the client using a clean session without username and password.
PubSubClient & setStream(Stream &stream)
Sets the stream to write received messages to.
#define MQTT_QOS0
Quality of Service 0: At most once.
#define MQTT_QOS2
#define MQTT_QOS1
Quality of Service 1: At least once.
#define MQTT_CONNECTION_TIMEOUT
The network connection timed out or server didn't respond within the keepalive time.
#define MQTT_CONNECTED
The client is connected.
#define MQTT_CONNECT_FAILED
The network connection failed.
#define MQTT_CONNECTION_LOST
The network connection was lost/broken.
#define MQTT_DISCONNECTED
The client is disconnected cleanly.