PubSubClient3 v3.3.0
Located at <a href='https://github.com/hmueller01/pubsubclient3'>GitHub</a>
 
Loading...
Searching...
No Matches
PubSubClient.cpp
Go to the documentation of this file.
1
10
11#include "PubSubClient.h"
12
20#define CHECK_STRING_LENGTH(l, s) \
21 if ((!s) || (l + 2 + strnlen(s, _bufferSize) > _bufferSize)) { \
22 _client->stop(); \
23 return false; \
24 }
25
31
33 setClient(client);
34}
35
36PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) : PubSubClient() {
37 setServer(addr, port);
38 setClient(client);
39}
40
41PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
42 setServer(addr, port);
43 setClient(client);
44 setStream(stream);
45}
46
47PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
48 setServer(addr, port);
49 setCallback(callback);
50 setClient(client);
51}
52
53PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
54 setServer(addr, port);
55 setCallback(callback);
56 setClient(client);
57 setStream(stream);
58}
59
60PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, Client& client) : PubSubClient() {
61 setServer(ip, port);
62 setClient(client);
63}
64
65PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
66 setServer(ip, port);
67 setClient(client);
68 setStream(stream);
69}
70
71PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
72 setServer(ip, port);
73 setCallback(callback);
74 setClient(client);
75}
76
77PubSubClient::PubSubClient(uint8_t* ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
78 setServer(ip, port);
79 setCallback(callback);
80 setClient(client);
81 setStream(stream);
82}
83
84PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) : PubSubClient() {
85 setServer(domain, port);
86 setClient(client);
87}
88
89PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) : PubSubClient() {
90 setServer(domain, port);
91 setClient(client);
92 setStream(stream);
93}
94
95PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) : PubSubClient() {
96 setServer(domain, port);
97 setCallback(callback);
98 setClient(client);
99}
100
101PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) : PubSubClient() {
102 setServer(domain, port);
103 setCallback(callback);
104 setClient(client);
105 setStream(stream);
106}
107
109 free(_domain);
110 free(_buffer);
111}
112
113bool PubSubClient::connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, bool willRetain,
114 const char* willMessage, bool cleanSession) {
115 if (!_client) return false; // do not crash if client not set
116 if (!connected()) {
117 int result = 0;
118
119 if (_client->connected()) {
120 result = 1;
121 } else if (_port != 0) {
122 if (_domain) {
123 result = _client->connect(_domain, _port);
124 } else {
125 result = _client->connect(_ip, _port);
126 }
127 }
128
129 if (result == 1) {
130 _nextMsgId = 1; // init msgId (packet identifier)
131
132#if MQTT_VERSION == MQTT_VERSION_3_1
133 const uint8_t protocol[9] = {0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', MQTT_VERSION};
134#elif MQTT_VERSION == MQTT_VERSION_3_1_1
135 const uint8_t protocol[7] = {0x00, 0x04, 'M', 'Q', 'T', 'T', MQTT_VERSION};
136#endif
137 // Leave room in the _buffer for header and variable length field
138 memcpy(_buffer + MQTT_MAX_HEADER_SIZE, protocol, sizeof(protocol));
139
140 size_t length = MQTT_MAX_HEADER_SIZE + sizeof(protocol);
141 uint8_t flags = 0x00;
142 if (willTopic) {
143 flags = (0x01 << 2) | (willQos << 3) | (willRetain << 5); // set will flag bit 2, will QoS and will retain bit 5
144 }
145 if (cleanSession) {
146 flags = flags | (0x01 << 1); // set clean session bit 1
147 }
148 if (user) {
149 flags = flags | (0x01 << 7); // set user name flag bit 7
150 if (pass) {
151 flags = flags | (0x01 << 6); // set password flag bit 6
152 }
153 }
154 const uint16_t keepAlive = _keepAliveMillis / 1000;
155 _buffer[length++] = flags;
156 _buffer[length++] = keepAlive >> 8;
157 _buffer[length++] = keepAlive & 0xFF;
158
159 CHECK_STRING_LENGTH(length, id)
160 length = writeString(id, length);
161 if (willTopic) {
162 CHECK_STRING_LENGTH(length, willTopic)
163 length = writeString(willTopic, length);
164 CHECK_STRING_LENGTH(length, willMessage)
165 length = writeString(willMessage, length);
166 }
167
168 if (user) {
169 CHECK_STRING_LENGTH(length, user)
170 length = writeString(user, length);
171 if (pass) {
172 CHECK_STRING_LENGTH(length, pass)
173 length = writeString(pass, length);
174 }
175 }
176
177 if (!writeControlPacket(MQTTCONNECT, length - MQTT_MAX_HEADER_SIZE)) {
178 _state = MQTT_CONNECT_FAILED;
179 _client->stop();
180 return false;
181 }
182 _lastInActivity = _lastOutActivity = millis();
183 _pingOutstanding = false;
184
185 while (!_client->available()) {
186 yield();
187 unsigned long t = millis();
188 if (t - _lastInActivity >= _socketTimeoutMillis) {
189 DEBUG_PSC_PRINTF("connect aborting due to timeout\n");
191 _client->stop();
192 return false;
193 }
194 }
195 uint8_t hdrLen;
196 size_t len = readPacket(&hdrLen);
197
198 if (len == 4) {
199 if (_buffer[3] == 0) {
200 _lastInActivity = millis();
201 _state = MQTT_CONNECTED;
202 return true;
203 } else {
204 _state = _buffer[3];
205 }
206 }
207 DEBUG_PSC_PRINTF("connect aborting due to protocol error\n");
208 _client->stop();
209 } else {
210 _state = MQTT_CONNECT_FAILED;
211 }
212 return false;
213 }
214 return true;
215}
216
218 if (!_client) return false;
219
220 if (_client->connected()) {
221 return (_state == MQTT_CONNECTED);
222 } else if (_state == MQTT_CONNECTED) {
223 DEBUG_PSC_PRINTF("lost connection (client may have more details)\n");
224 _state = MQTT_CONNECTION_LOST;
225 _client->stop();
226 _pingOutstanding = false;
227 }
228 return false;
229}
230
232 DEBUG_PSC_PRINTF("disconnect called\n");
233 _state = MQTT_DISCONNECTED;
234 if (_client) {
235 _buffer[0] = MQTTDISCONNECT;
236 _buffer[1] = 0;
237 _client->write(_buffer, 2);
238 _client->flush();
239 _client->stop();
240 _lastInActivity = _lastOutActivity = millis();
241 }
242 _pingOutstanding = false;
243}
244
251bool PubSubClient::readByte(uint8_t* result) {
252 if (!_client) return false; // do not crash if client not set
253
254 unsigned long previousMillis = millis();
255 while (!_client->available()) {
256 yield();
257 unsigned long currentMillis = millis();
258 if (currentMillis - previousMillis >= _socketTimeoutMillis) {
259 return false;
260 }
261 }
262 int rc = _client->read();
263 if (rc < 0) {
264 return false;
265 }
266 *result = (uint8_t)rc;
267 return true;
268}
269
276bool PubSubClient::readByte(uint8_t* result, size_t* pos) {
277 uint8_t* write_address = &(result[*pos]);
278 if (readByte(write_address)) {
279 (*pos)++;
280 return true;
281 }
282 return false;
283}
284
291size_t PubSubClient::readPacket(uint8_t* hdrLen) {
292 size_t len = 0;
293 if (!readByte(_buffer, &len)) return 0;
294 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
295 uint32_t multiplier = 1;
296 size_t length = 0;
297 uint8_t digit = 0;
298 uint16_t skip = 0;
299 uint8_t start = 0;
300
301 do {
302 if (len == MQTT_MAX_HEADER_SIZE) {
303 // Invalid remaining length encoding - kill the connection
304 DEBUG_PSC_PRINTF("readPacket detected packet of invalid length\n");
305 _state = MQTT_DISCONNECTED;
306 _client->stop();
307 return 0;
308 }
309 if (!readByte(&digit)) return 0;
310 _buffer[len++] = digit;
311 length += (digit & 0x7F) * multiplier; // length is coded in the lower 7 bits
312 multiplier <<= 7; // multiplier *= 128
313 } while ((digit & 0x80) != 0); // do while 8th continuation bit is set
314 *hdrLen = (uint8_t)(len - 1);
315
316 DEBUG_PSC_PRINTF("readPacket received packet of length %zu (isPublish = %u)\n", length, isPublish);
317
318 if (isPublish) {
319 // Read in topic length to calculate bytes to skip over for Stream writing
320 if (!readByte(_buffer, &len)) return 0;
321 if (!readByte(_buffer, &len)) return 0;
322 skip = (_buffer[*hdrLen + 1] << 8) + _buffer[*hdrLen + 2];
323 start = 2;
324 if (MQTT_HDR_GET_QOS(_buffer[0]) > MQTT_QOS0) {
325 // skip msgId (packet identifier) for QoS 1 and 2 messages
326 skip += 2;
327 }
328 }
329 size_t idx = len;
330
331 for (size_t i = start; i < length; i++) {
332 if (!readByte(&digit)) return 0;
333 if (_stream) {
334 if (isPublish && (idx - *hdrLen - 2 > skip)) {
335 _stream->write(digit);
336 }
337 }
338
339 if (len < _bufferSize) {
340 _buffer[len++] = digit;
341 }
342 idx++;
343 }
344
345 if (!_stream && (idx > _bufferSize)) {
346 DEBUG_PSC_PRINTF("readPacket ignoring packet of size %zu exceeding buffer of size %zu\n", length, _bufferSize);
347 len = 0; // This will cause the packet to be ignored.
348 }
349 return len;
350}
351
359bool PubSubClient::handlePacket(uint8_t hdrLen, size_t length) {
360 uint8_t type = _buffer[0] & 0xF0;
361 DEBUG_PSC_PRINTF("received message of type %u\n", type);
362 switch (type) {
363 case MQTTPUBLISH:
364 if (callback) {
365 // MQTT Publish packet: See section 3.3 MQTT v3.1.1 protocol specification:
366 // - Header: 1 byte
367 // - Remaining header length: hdrLen bytes, multibyte field (1 .. MQTT_MAX_HEADER_SIZE - 1)
368 // - Topic length: 2 bytes (starts at _buffer[hdrLen + 1])
369 // - Topic: topicLen bytes (starts at _buffer[hdrLen + 3])
370 // - Packet Identifier (msgId): 0 bytes for QoS 0, 2 bytes for QoS 1 and 2 (starts at _buffer[hdrLen + 3 + topicLen])
371 // - Payload (for QoS = 0): length - (hdrLen + 3 + topicLen) bytes (starts at _buffer[hdrLen + 3 + topicLen])
372 // - Payload (for QoS > 0): length - (hdrLen + 5 + topicLen) bytes (starts at _buffer[hdrLen + 5 + topicLen])
373 // To get a null reminated 'C' topic string we move the topic 1 byte to the front (overwriting the LSB of the topic lenght)
374 uint16_t topicLen = (_buffer[hdrLen + 1] << 8) + _buffer[hdrLen + 2]; // topic length in bytes
375 char* topic = (char*)(_buffer + hdrLen + 3 - 1); // set the topic in the LSB of the topic lenght, as we move it there
376 uint16_t payloadOffset = hdrLen + 3 + topicLen; // payload starts after header and topic (if there is no packet identifier)
377 size_t payloadLen = length - payloadOffset; // this might change by 2 if we have a QoS 1 or 2 message
378 uint8_t* payload = _buffer + payloadOffset;
379
380 if (length < payloadOffset) { // do not move outside the max bufferSize
381 ERROR_PSC_PRINTF_P("handlePacket(): Suspicious topicLen (%u) points outside of received buffer length (%zu)\n", topicLen, length);
382 return false;
383 }
384 memmove(topic, topic + 1, topicLen); // move topic inside buffer 1 byte to front
385 topic[topicLen] = '\0'; // end the topic as a 'C' string with \x00
386
387 if (MQTT_HDR_GET_QOS(_buffer[0]) == MQTT_QOS0) {
388 // No msgId for QOS == 0
389 callback(topic, payload, payloadLen);
390 } else {
391 // For QOS 1 and 2 we have a msgId (packet identifier) after the topic at the current payloadOffset
392 if (payloadLen < 2) { // payload must be >= 2, as we have the msgId before
393 ERROR_PSC_PRINTF_P("handlePacket(): Missing msgId in QoS 1/2 message\n");
394 return false;
395 }
396 uint16_t msgId = (_buffer[payloadOffset] << 8) + _buffer[payloadOffset + 1];
397 callback(topic, payload + 2, payloadLen - 2); // remove the msgId from the callback payload
398
399 _buffer[0] = MQTTPUBACK;
400 _buffer[1] = 2;
401 _buffer[2] = (msgId >> 8);
402 _buffer[3] = (msgId & 0xFF);
403 if (_client->write(_buffer, 4) == 4) {
404 _lastOutActivity = millis();
405 }
406 }
407 }
408 break;
409 case MQTTPUBACK:
410 // MQTT Publish Acknowledgment (QoS 1 publish received): See section 3.4 MQTT v3.1.1 protocol specification
411 if (length < 4) {
412 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBACK packet with length %zu, expected at least 4 bytes\n", length);
413 return false;
414 }
415 // No futher action here, as resending is not supported.
416 break;
417 case MQTTPUBREC:
418 // MQTT Publish Received (QoS 2 publish received, part 1): See section 3.5 MQTT v3.1.1 protocol specification
419 if (length < 4) {
420 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBREC packet with length %zu, expected at least 4 bytes\n", length);
421 return false;
422 }
423 // MQTT Publish Release (QoS 2 publish received, part 2): See section 3.6 MQTT v3.1.1 protocol specification
424 _buffer[0] = MQTTPUBREL | 2; // PUBREL with bit 1 set
425 // bytes 1-3 of PUBREL are the same as of PUBREC
426 if (_client->write(_buffer, 4) == 4) {
427 _lastOutActivity = millis();
428 }
429 break;
430 case MQTTPUBCOMP:
431 // MQTT Publish Complete (QoS 2 publish received, part 3): See section 3.7 MQTT v3.1.1 protocol specification
432 if (length < 4) {
433 ERROR_PSC_PRINTF_P("handlePacket(): Received PUBCOMP packet with length %zu, expected at least 4 bytes\n", length);
434 return false;
435 }
436 // No futher action here, as resending is not supported.
437 break;
438 case MQTTPINGREQ:
439 // MQTT Ping Request: See section 3.12 MQTT v3.1.1 protocol specification
440 _buffer[0] = MQTTPINGRESP;
441 _buffer[1] = 0;
442 if (_client->write(_buffer, 2) == 2) {
443 _lastOutActivity = millis();
444 }
445 break;
446 case MQTTPINGRESP:
447 _pingOutstanding = false;
448 break;
449 default:
450 break;
451 }
452 return true;
453}
454
456 if (!connected()) {
457 return false;
458 }
459 bool ret = true;
460 const unsigned long t = millis();
461 if (_keepAliveMillis && ((t - _lastInActivity > _keepAliveMillis) || (t - _lastOutActivity > _keepAliveMillis))) {
462 if (_pingOutstanding) {
463 DEBUG_PSC_PRINTF("loop aborting due to timeout\n");
465 _client->stop();
466 _pingOutstanding = false;
467 return false;
468 } else if (_bufferWritePos > 0) {
469 // There is still data in the _buffer to be sent, so send it now instead of a ping
470 if (flushBuffer() == 0) {
472 _client->stop();
473 _pingOutstanding = false;
474 return false;
475 }
476 } else {
477 _buffer[0] = MQTTPINGREQ;
478 _buffer[1] = 0;
479 if (_client->write(_buffer, 2) == 2) {
480 _lastInActivity = _lastOutActivity = t;
481 _pingOutstanding = true;
482 }
483 }
484 }
485 if (_client->available()) {
486 uint8_t hdrLen;
487 size_t len = readPacket(&hdrLen);
488 if (len > 0) {
489 _lastInActivity = t;
490 ret = handlePacket(hdrLen, len);
491 if (!ret) {
492 _state = MQTT_DISCONNECTED;
493 _client->stop();
494 }
495 } else if (!connected()) {
496 // readPacket has closed the connection
497 return false;
498 }
499 }
500 return ret;
501}
502
503bool PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
504 if (beginPublish(topic, plength, qos, retained)) {
505 size_t rc = write(payload, plength);
506 return endPublish() && (rc == plength);
507 }
508 return false;
509}
510
511bool PubSubClient::publish(const __FlashStringHelper* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
512 if (beginPublish(topic, plength, qos, retained)) {
513 size_t rc = write(payload, plength);
514 return endPublish() && (rc == plength);
515 }
516 return false;
517}
518
519bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
520 if (beginPublish(topic, plength, qos, retained)) {
521 size_t rc = write_P(payload, plength);
522 return endPublish() && (rc == plength);
523 }
524 return false;
525}
526
527bool PubSubClient::publish_P(const __FlashStringHelper* topic, const uint8_t* payload, size_t plength, uint8_t qos, bool retained) {
528 if (beginPublish(topic, plength, qos, retained)) {
529 size_t rc = write_P(payload, plength);
530 return endPublish() && (rc == plength);
531 }
532 return false;
533}
534
546bool PubSubClient::beginPublishImpl(bool progmem, const char* topic, size_t plength, uint8_t qos, bool retained) {
547 if (!topic) return false;
548
549 // get topic length depending on storage (RAM vs PROGMEM)
550 size_t topicLen = progmem ? strlen_P(topic) : strlen(topic);
551 if (topicLen == 0) return false; // empty topic is not allowed
552
553 if (qos > MQTT_QOS2) { // only valid QoS supported
554 ERROR_PSC_PRINTF_P("beginPublish() called with invalid QoS %u\n", qos);
555 return false;
556 }
557
558 const size_t nextMsgLen = (qos > MQTT_QOS0) ? 2 : 0; // add 2 bytes for nextMsgId if QoS > 0
559 // check if the header, the topic (including 2 length bytes) and nextMsgId fit into the _buffer
560 if (connected() && (MQTT_MAX_HEADER_SIZE + topicLen + 2 + nextMsgLen <= _bufferSize)) {
561 // first write the topic at the end of the maximal variable header (MQTT_MAX_HEADER_SIZE) to the _buffer
562 topicLen = writeStringImpl(progmem, topic, MQTT_MAX_HEADER_SIZE) - MQTT_MAX_HEADER_SIZE;
563 if (qos > MQTT_QOS0) {
564 // if QoS 1 or 2, we need to send the nextMsgId (packet identifier) after topic
565 writeNextMsgId(MQTT_MAX_HEADER_SIZE + topicLen);
566 }
567 // we now know the length of the topic string (length + 2 bytes signalling the length) and can build the variable header information
568 const uint8_t header = MQTTPUBLISH | MQTT_QOS_GET_HDR(qos) | (retained ? MQTTRETAINED : 0);
569 uint8_t hdrLen = buildHeader(header, topicLen + nextMsgLen + plength);
570 if (hdrLen == 0) return false; // exit here in case of header generation failure
571 // as the header length is variable, it starts at MQTT_MAX_HEADER_SIZE - hdrLen (see buildHeader() documentation)
572 size_t rc = _client->write(_buffer + (MQTT_MAX_HEADER_SIZE - hdrLen), hdrLen + topicLen + nextMsgLen);
573 _lastOutActivity = millis();
574 return (rc == (hdrLen + topicLen + nextMsgLen));
575 }
576 return false;
577}
578
580 if (connected()) {
581 if (_bufferWritePos > 0) {
582 // still data in the _buffer to be sent
583 if (flushBuffer() == 0) return false;
584 }
585 return true;
586 }
587 return false;
588}
589
599uint8_t PubSubClient::buildHeader(uint8_t header, size_t length) {
600 uint8_t hdrBuf[MQTT_MAX_HEADER_SIZE - 1];
601 uint8_t hdrLen = 0;
602 uint8_t digit;
603 size_t len = length;
604 do {
605 digit = len & 0x7F; // digit = len % 128
606 len >>= 7; // len = len / 128
607 if (len > 0) {
608 digit |= 0x80;
609 }
610 hdrBuf[hdrLen++] = digit;
611 } while ((len > 0) && (hdrLen < MQTT_MAX_HEADER_SIZE - 1));
612
613 if (len > 0) {
614 ERROR_PSC_PRINTF_P("buildHeader: header=0x%02X, length too big %zu, left %zu\n", header, length, len);
615 return 0;
616 }
617
618 _buffer[MQTT_MAX_HEADER_SIZE - 1 - hdrLen] = header;
619 memcpy(_buffer + MQTT_MAX_HEADER_SIZE - hdrLen, hdrBuf, hdrLen);
620 return hdrLen + 1; // Full header size is variable length bit plus the 1-byte fixed header
621}
622
623size_t PubSubClient::write(uint8_t data) {
624 return appendBuffer(data);
625}
626
627size_t PubSubClient::write(const uint8_t* buf, size_t size) {
628 for (size_t i = 0; i < size; i++) {
629 if (appendBuffer(buf[i]) == 0) return i;
630 }
631 return size;
632}
633
634size_t PubSubClient::write_P(const uint8_t* buf, size_t size) {
635 for (size_t i = 0; i < size; i++) {
636 if (appendBuffer((uint8_t)pgm_read_byte_near(buf + i)) == 0) return i;
637 }
638 return size;
639}
640
649bool PubSubClient::writeControlPacket(uint8_t header, size_t length) {
650 uint8_t hdrLen = buildHeader(header, length);
651 if (hdrLen == 0) return false; // exit here in case of header generation failure
652
653 return writeBuffer(MQTT_MAX_HEADER_SIZE - hdrLen, hdrLen + length);
654}
655
663size_t PubSubClient::writeBuffer(size_t pos, size_t size) {
664 size_t rc = 0;
665 if (_client && (size > 0) && (pos + size <= _bufferSize)) {
666#ifdef MQTT_MAX_TRANSFER_SIZE
667 uint8_t* writeBuf = _buffer + pos;
668 size_t bytesRemaining = size;
669 bool result = true;
670 while ((bytesRemaining > 0) && result) {
671 size_t bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE) ? MQTT_MAX_TRANSFER_SIZE : bytesRemaining;
672 size_t bytesWritten = _client->write(writeBuf, bytesToWrite);
673 result = (bytesWritten == bytesToWrite);
674 bytesRemaining -= bytesWritten;
675 writeBuf += bytesWritten;
676 if (result) {
677 _lastOutActivity = millis();
678 }
679 yield();
680 }
681 rc = result ? size : 0; // if result is false indicate a write error
682#else
683 rc = _client->write(_buffer + pos, size);
684 if (rc == size) {
685 _lastOutActivity = millis();
686 } else {
687 rc = 0; // indicate a write error
688 }
689#endif
690 }
691 return rc;
692}
693
706size_t PubSubClient::writeStringImpl(bool progmem, const char* string, size_t pos) {
707 if (!string) return pos;
708
709 size_t sLen = progmem ? strlen_P(string) : strlen(string);
710 if ((pos + 2 + sLen <= _bufferSize) && (sLen <= 0xFFFF)) {
711 _buffer[pos++] = (uint8_t)(sLen >> 8);
712 _buffer[pos++] = (uint8_t)(sLen & 0xFF);
713 if (progmem) {
714 memcpy_P(_buffer + pos, string, sLen);
715 } else {
716 memcpy(_buffer + pos, string, sLen);
717 }
718 pos += sLen;
719 } else {
720 ERROR_PSC_PRINTF_P("writeStringImpl(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, _bufferSize);
721 }
722 return pos;
723}
724
735inline size_t PubSubClient::writeString(const char* string, size_t pos) {
736 return writeStringImpl(false, string, pos);
737}
738
746size_t PubSubClient::writeNextMsgId(size_t pos) {
747 if ((pos + 2) <= _bufferSize) {
748 _nextMsgId = (++_nextMsgId == 0) ? 1 : _nextMsgId; // increment msgId (must not be 0, so start at 1)
749 _buffer[pos++] = (uint8_t)(_nextMsgId >> 8);
750 _buffer[pos++] = (uint8_t)(_nextMsgId & 0xFF);
751 } else {
752 ERROR_PSC_PRINTF_P("writeNextMsgId(): buffer overrun (%zu) \n", pos + 2);
753 }
754 return pos;
755}
756
763size_t PubSubClient::appendBuffer(uint8_t data) {
764 _buffer[_bufferWritePos++] = data;
765 if (_bufferWritePos >= _bufferSize) {
766 if (flushBuffer() == 0) return 0;
767 }
768 return 1;
769}
770
777size_t PubSubClient::flushBuffer() {
778 size_t rc = 0;
779 if (connected()) {
780 rc = writeBuffer(0, _bufferWritePos);
781 }
782 _bufferWritePos = 0;
783 return rc;
784}
785
794bool PubSubClient::subscribeImpl(bool progmem, const char* topic, uint8_t qos) {
795 if (!topic) return false;
796 if (qos > MQTT_QOS1) return false; // only QoS 0 and 1 supported
797
798 // get topic length depending on storage (RAM vs PROGMEM)
799 size_t topicLen = progmem ? strnlen_P(topic, _bufferSize) : strnlen(topic, _bufferSize);
800 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen + 1) {
801 // Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen + QoS (1)
802 return false;
803 }
804 if (connected()) {
805 // Leave room in the _buffer for header and variable length field
806 uint16_t length = MQTT_MAX_HEADER_SIZE;
807 length = writeNextMsgId(length); // _buffer size is checked before
808 length = writeStringImpl(progmem, topic, length);
809 _buffer[length++] = qos;
810 return writeControlPacket(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
811 }
812 return false;
813}
814
822bool PubSubClient::unsubscribeImpl(bool progmem, const char* topic) {
823 if (!topic) return false;
824
825 // get topic length depending on storage (RAM vs PROGMEM)
826 size_t topicLen = progmem ? strnlen_P(topic, _bufferSize) : strnlen(topic, _bufferSize);
827 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen) {
828 // Too long: header + nextMsgId (2) + topic length bytes (2) + topicLen
829 return false;
830 }
831 if (connected()) {
832 uint16_t length = MQTT_MAX_HEADER_SIZE;
833 length = writeNextMsgId(length); // _buffer size is checked before
834 length = writeStringImpl(progmem, topic, length);
835 return writeControlPacket(MQTTUNSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
836 }
837 return false;
838}
839
840PubSubClient& PubSubClient::setServer(uint8_t* ip, uint16_t port) {
841 IPAddress addr(ip[0], ip[1], ip[2], ip[3]);
842 return setServer(addr, port);
843}
844
845PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
846 _ip = ip;
847 _port = port;
848 free(_domain);
849 _domain = nullptr;
850 return *this;
851}
852
853PubSubClient& PubSubClient::setServer(const char* domain, uint16_t port) {
854 char* newDomain = nullptr;
855 if (domain) {
856 newDomain = (char*)realloc(_domain, strlen(domain) + 1);
857 }
858 if (newDomain) {
859 strcpy(newDomain, domain);
860 _domain = newDomain;
861 _port = port;
862 } else {
863 free(_domain);
864 _domain = nullptr;
865 _port = 0;
866 }
867 return *this;
868}
869
870PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
871 this->callback = callback;
872 return *this;
873}
874
876 _client = &client;
877 return *this;
878}
879
881 _stream = &stream;
882 return *this;
883}
884
886 if (size == 0) {
887 // Cannot set it back to 0
888 return false;
889 }
890 if (_bufferSize == 0) {
891 _buffer = (uint8_t*)malloc(size);
892 } else {
893 uint8_t* newBuffer = (uint8_t*)realloc(_buffer, size);
894 if (newBuffer) {
895 _buffer = newBuffer;
896 } else {
897 return false;
898 }
899 }
900 _bufferSize = size;
901 return (_buffer != nullptr);
902}
903
905 return _bufferSize;
906}
907
909 _keepAliveMillis = keepAlive * 1000UL;
910 return *this;
911}
912
914 _socketTimeoutMillis = timeout * 1000UL;
915 return *this;
916}
917
919 return _state;
920}
A simple client for MQTT.
#define MQTT_SOCKET_TIMEOUT
Sets the timeout, in seconds, when reading from the network. This also applies as the timeout for cal...
#define MQTT_MAX_TRANSFER_SIZE
Sets the maximum number of bytes passed to the network client in each write call. Some hardware has a...
#define MQTT_VERSION
Sets the version of the MQTT protocol to use (3.1 or 3.1.1). [MQTT_VERSION_3_1, MQTT_VERSION_3_1_1].
#define MQTT_MAX_PACKET_SIZE
Sets the largest packet size, in bytes, the client will handle. Any packet received that exceeds this...
#define MQTT_KEEPALIVE
Sets the keepalive interval, in seconds, the client will use. This is used to maintain the connection...
bool loop()
This should be called regularly to allow the client to process incoming messages and maintain its con...
PubSubClient & setCallback(MQTT_CALLBACK_SIGNATURE)
Sets the message callback function.
PubSubClient & setServer(IPAddress ip, uint16_t port)
Sets the server details.
bool publish_P(const char *topic, PGM_P payload, bool retained)
Publishes a message stored in PROGMEM to the specified topic using QoS 0.
bool beginPublish(const char *topic, size_t plength, bool retained)
Start to publish a message using QoS 0. This API: beginPublish(...) one or more calls to write(....
virtual size_t write(uint8_t data)
Writes a single byte as a component of a publish started with a call to beginPublish....
bool publish(const char *topic, const char *payload)
Publishes a non retained message to the specified topic using QoS 0.
~PubSubClient()
Destructor for the PubSubClient class.
PubSubClient & setSocketTimeout(uint16_t timeout)
Sets the socket timeout used by the client. This determines how long the client will wait for incomin...
PubSubClient()
Creates an uninitialised client instance.
PubSubClient & setKeepAlive(uint16_t keepAlive)
Sets the keep alive interval used by the client. This value should only be changed when the client is...
bool connected()
Checks whether the client is connected to the server.
int state()
Returns the current state of the client. If a connection attempt fails, this can be used to get more ...
bool endPublish()
Finish sending a message that was started with a call to beginPublish.
size_t write_P(PGM_P string)
Writes a string in PROGMEM as a component of a publish started with a call to beginPublish....
PubSubClient & setClient(Client &client)
Sets the network client instance to use.
void disconnect()
Disconnects the client.
size_t getBufferSize()
Gets the current size of the internal buffer.
bool setBufferSize(size_t size)
Sets the size, in bytes, of the internal send and receive buffer. This must be large enough to contai...
bool connect(const char *id)
Connects the client using a clean session without username and password.
PubSubClient & setStream(Stream &stream)
Sets the stream to write received messages to.
#define MQTT_QOS0
Quality of Service 0: At most once.
#define MQTT_QOS2
#define MQTT_QOS1
Quality of Service 1: At least once.
#define MQTT_CONNECTION_TIMEOUT
The network connection timed out or server didn't respond within the keepalive time.
#define MQTT_CONNECTED
The client is connected.
#define MQTT_CONNECT_FAILED
The network connection failed.
#define MQTT_CONNECTION_LOST
The network connection was lost/broken.
#define MQTT_DISCONNECTED
The client is disconnected cleanly.