20#define CHECK_STRING_LENGTH(l, s) \
21 if ((!s) || (l + 2 + strnlen(s, _bufferSize) > _bufferSize)) { \
113bool PubSubClient::connect(
const char*
id,
const char* user,
const char* pass,
const char* willTopic, uint8_t willQos,
bool willRetain,
114 const char* willMessage,
bool cleanSession) {
115 if (!_client)
return false;
119 if (_client->connected()) {
121 }
else if (_port != 0) {
123 result = _client->connect(_domain, _port);
125 result = _client->connect(_ip, _port);
132#if MQTT_VERSION == MQTT_VERSION_3_1
133 const uint8_t protocol[9] = {0x00, 0x06,
'M',
'Q',
'I',
's',
'd',
'p',
MQTT_VERSION};
134#elif MQTT_VERSION == MQTT_VERSION_3_1_1
135 const uint8_t protocol[7] = {0x00, 0x04,
'M',
'Q',
'T',
'T',
MQTT_VERSION};
138 memcpy(_buffer + MQTT_MAX_HEADER_SIZE, protocol,
sizeof(protocol));
140 size_t length = MQTT_MAX_HEADER_SIZE +
sizeof(protocol);
141 uint8_t flags = 0x00;
143 flags = (0x01 << 2) | (willQos << 3) | (willRetain << 5);
146 flags = flags | (0x01 << 1);
149 flags = flags | (0x01 << 7);
151 flags = flags | (0x01 << 6);
154 const uint16_t keepAlive = _keepAliveMillis / 1000;
155 _buffer[length++] = flags;
156 _buffer[length++] = keepAlive >> 8;
157 _buffer[length++] = keepAlive & 0xFF;
159 CHECK_STRING_LENGTH(length,
id)
160 length = writeString(
id, length);
162 CHECK_STRING_LENGTH(length, willTopic)
163 length = writeString(willTopic, length);
164 CHECK_STRING_LENGTH(length, willMessage)
165 length = writeString(willMessage, length);
169 CHECK_STRING_LENGTH(length, user)
170 length = writeString(user, length);
172 CHECK_STRING_LENGTH(length, pass)
173 length = writeString(pass, length);
177 if (!writeControlPacket(MQTTCONNECT, length - MQTT_MAX_HEADER_SIZE)) {
182 _lastInActivity = _lastOutActivity = millis();
183 _pingOutstanding =
false;
185 while (!_client->available()) {
187 unsigned long t = millis();
188 if (t - _lastInActivity >= _socketTimeoutMillis) {
189 DEBUG_PSC_PRINTF(
"connect aborting due to timeout\n");
196 size_t len = readPacket(&hdrLen);
199 if (_buffer[3] == 0) {
200 _lastInActivity = millis();
207 DEBUG_PSC_PRINTF(
"connect aborting due to protocol error\n");
218 if (!_client)
return false;
220 if (_client->connected()) {
223 DEBUG_PSC_PRINTF(
"lost connection (client may have more details)\n");
226 _pingOutstanding =
false;
232 DEBUG_PSC_PRINTF(
"disconnect called\n");
235 _buffer[0] = MQTTDISCONNECT;
237 _client->write(_buffer, 2);
240 _lastInActivity = _lastOutActivity = millis();
242 _pingOutstanding =
false;
251bool PubSubClient::readByte(uint8_t* result) {
252 if (!_client)
return false;
254 unsigned long previousMillis = millis();
255 while (!_client->available()) {
257 unsigned long currentMillis = millis();
258 if (currentMillis - previousMillis >= _socketTimeoutMillis) {
262 int rc = _client->read();
266 *result = (uint8_t)rc;
276bool PubSubClient::readByte(uint8_t* result,
size_t* pos) {
277 uint8_t* write_address = &(result[*pos]);
278 if (readByte(write_address)) {
291size_t PubSubClient::readPacket(uint8_t* hdrLen) {
293 if (!readByte(_buffer, &len))
return 0;
294 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
295 uint32_t multiplier = 1;
302 if (len == MQTT_MAX_HEADER_SIZE) {
304 DEBUG_PSC_PRINTF(
"readPacket detected packet of invalid length\n");
309 if (!readByte(&digit))
return 0;
310 _buffer[len++] = digit;
311 length += (digit & 0x7F) * multiplier;
313 }
while ((digit & 0x80) != 0);
314 *hdrLen = (uint8_t)(len - 1);
316 DEBUG_PSC_PRINTF(
"readPacket received packet of length %zu (isPublish = %u)\n", length, isPublish);
320 if (!readByte(_buffer, &len))
return 0;
321 if (!readByte(_buffer, &len))
return 0;
322 skip = (_buffer[*hdrLen + 1] << 8) + _buffer[*hdrLen + 2];
324 if (MQTT_HDR_GET_QOS(_buffer[0]) >
MQTT_QOS0) {
331 for (
size_t i = start; i < length; i++) {
332 if (!readByte(&digit))
return 0;
334 if (isPublish && (idx - *hdrLen - 2 > skip)) {
335 _stream->write(digit);
339 if (len < _bufferSize) {
340 _buffer[len++] = digit;
345 if (!_stream && (idx > _bufferSize)) {
346 DEBUG_PSC_PRINTF(
"readPacket ignoring packet of size %zu exceeding buffer of size %zu\n", length, _bufferSize);
359bool PubSubClient::handlePacket(uint8_t hdrLen,
size_t length) {
360 uint8_t type = _buffer[0] & 0xF0;
361 DEBUG_PSC_PRINTF(
"received message of type %u\n", type);
374 uint16_t topicLen = (_buffer[hdrLen + 1] << 8) + _buffer[hdrLen + 2];
375 char* topic = (
char*)(_buffer + hdrLen + 3 - 1);
376 uint16_t payloadOffset = hdrLen + 3 + topicLen;
377 size_t payloadLen = length - payloadOffset;
378 uint8_t* payload = _buffer + payloadOffset;
380 if (length < payloadOffset) {
381 ERROR_PSC_PRINTF_P(
"handlePacket(): Suspicious topicLen (%u) points outside of received buffer length (%zu)\n", topicLen, length);
384 memmove(topic, topic + 1, topicLen);
385 topic[topicLen] =
'\0';
387 if (MQTT_HDR_GET_QOS(_buffer[0]) ==
MQTT_QOS0) {
389 callback(topic, payload, payloadLen);
392 if (payloadLen < 2) {
393 ERROR_PSC_PRINTF_P(
"handlePacket(): Missing msgId in QoS 1/2 message\n");
396 uint16_t msgId = (_buffer[payloadOffset] << 8) + _buffer[payloadOffset + 1];
397 callback(topic, payload + 2, payloadLen - 2);
399 _buffer[0] = MQTTPUBACK;
401 _buffer[2] = (msgId >> 8);
402 _buffer[3] = (msgId & 0xFF);
403 if (_client->write(_buffer, 4) == 4) {
404 _lastOutActivity = millis();
412 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBACK packet with length %zu, expected at least 4 bytes\n", length);
420 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBREC packet with length %zu, expected at least 4 bytes\n", length);
424 _buffer[0] = MQTTPUBREL | 2;
426 if (_client->write(_buffer, 4) == 4) {
427 _lastOutActivity = millis();
433 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBCOMP packet with length %zu, expected at least 4 bytes\n", length);
440 _buffer[0] = MQTTPINGRESP;
442 if (_client->write(_buffer, 2) == 2) {
443 _lastOutActivity = millis();
447 _pingOutstanding =
false;
460 const unsigned long t = millis();
461 if (_keepAliveMillis && ((t - _lastInActivity > _keepAliveMillis) || (t - _lastOutActivity > _keepAliveMillis))) {
462 if (_pingOutstanding) {
463 DEBUG_PSC_PRINTF(
"loop aborting due to timeout\n");
466 _pingOutstanding =
false;
468 }
else if (_bufferWritePos > 0) {
470 if (flushBuffer() == 0) {
473 _pingOutstanding =
false;
477 _buffer[0] = MQTTPINGREQ;
479 if (_client->write(_buffer, 2) == 2) {
480 _lastInActivity = _lastOutActivity = t;
481 _pingOutstanding =
true;
485 if (_client->available()) {
487 size_t len = readPacket(&hdrLen);
490 ret = handlePacket(hdrLen, len);
505 size_t rc =
write(payload, plength);
511bool PubSubClient::publish(
const __FlashStringHelper* topic,
const uint8_t* payload,
size_t plength, uint8_t qos,
bool retained) {
513 size_t rc =
write(payload, plength);
521 size_t rc =
write_P(payload, plength);
527bool PubSubClient::publish_P(
const __FlashStringHelper* topic,
const uint8_t* payload,
size_t plength, uint8_t qos,
bool retained) {
529 size_t rc =
write_P(payload, plength);
546bool PubSubClient::beginPublishImpl(
bool progmem,
const char* topic,
size_t plength, uint8_t qos,
bool retained) {
547 if (!topic)
return false;
550 size_t topicLen = progmem ? strlen_P(topic) : strlen(topic);
551 if (topicLen == 0)
return false;
554 ERROR_PSC_PRINTF_P(
"beginPublish() called with invalid QoS %u\n", qos);
558 const size_t nextMsgLen = (qos >
MQTT_QOS0) ? 2 : 0;
560 if (
connected() && (MQTT_MAX_HEADER_SIZE + topicLen + 2 + nextMsgLen <= _bufferSize)) {
562 topicLen = writeStringImpl(progmem, topic, MQTT_MAX_HEADER_SIZE) - MQTT_MAX_HEADER_SIZE;
565 writeNextMsgId(MQTT_MAX_HEADER_SIZE + topicLen);
568 const uint8_t header = MQTTPUBLISH | MQTT_QOS_GET_HDR(qos) | (retained ? MQTTRETAINED : 0);
569 uint8_t hdrLen = buildHeader(header, topicLen + nextMsgLen + plength);
570 if (hdrLen == 0)
return false;
572 size_t rc = _client->write(_buffer + (MQTT_MAX_HEADER_SIZE - hdrLen), hdrLen + topicLen + nextMsgLen);
573 _lastOutActivity = millis();
574 return (rc == (hdrLen + topicLen + nextMsgLen));
581 if (_bufferWritePos > 0) {
583 if (flushBuffer() == 0)
return false;
599uint8_t PubSubClient::buildHeader(uint8_t header,
size_t length) {
600 uint8_t hdrBuf[MQTT_MAX_HEADER_SIZE - 1];
610 hdrBuf[hdrLen++] = digit;
611 }
while ((len > 0) && (hdrLen < MQTT_MAX_HEADER_SIZE - 1));
614 ERROR_PSC_PRINTF_P(
"buildHeader: header=0x%02X, length too big %zu, left %zu\n", header, length, len);
618 _buffer[MQTT_MAX_HEADER_SIZE - 1 - hdrLen] = header;
619 memcpy(_buffer + MQTT_MAX_HEADER_SIZE - hdrLen, hdrBuf, hdrLen);
624 return appendBuffer(data);
628 for (
size_t i = 0; i < size; i++) {
629 if (appendBuffer(buf[i]) == 0)
return i;
635 for (
size_t i = 0; i < size; i++) {
636 if (appendBuffer((uint8_t)pgm_read_byte_near(buf + i)) == 0)
return i;
649bool PubSubClient::writeControlPacket(uint8_t header,
size_t length) {
650 uint8_t hdrLen = buildHeader(header, length);
651 if (hdrLen == 0)
return false;
653 return writeBuffer(MQTT_MAX_HEADER_SIZE - hdrLen, hdrLen + length);
663size_t PubSubClient::writeBuffer(
size_t pos,
size_t size) {
665 if (_client && (size > 0) && (pos + size <= _bufferSize)) {
666#ifdef MQTT_MAX_TRANSFER_SIZE
667 uint8_t* writeBuf = _buffer + pos;
668 size_t bytesRemaining = size;
670 while ((bytesRemaining > 0) && result) {
672 size_t bytesWritten = _client->write(writeBuf, bytesToWrite);
673 result = (bytesWritten == bytesToWrite);
674 bytesRemaining -= bytesWritten;
675 writeBuf += bytesWritten;
677 _lastOutActivity = millis();
681 rc = result ? size : 0;
683 rc = _client->write(_buffer + pos, size);
685 _lastOutActivity = millis();
706size_t PubSubClient::writeStringImpl(
bool progmem,
const char*
string,
size_t pos) {
707 if (!
string)
return pos;
709 size_t sLen = progmem ? strlen_P(
string) : strlen(string);
710 if ((pos + 2 + sLen <= _bufferSize) && (sLen <= 0xFFFF)) {
711 _buffer[pos++] = (uint8_t)(sLen >> 8);
712 _buffer[pos++] = (uint8_t)(sLen & 0xFF);
714 memcpy_P(_buffer + pos,
string, sLen);
716 memcpy(_buffer + pos,
string, sLen);
720 ERROR_PSC_PRINTF_P(
"writeStringImpl(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, _bufferSize);
735inline size_t PubSubClient::writeString(
const char*
string,
size_t pos) {
736 return writeStringImpl(
false,
string, pos);
746size_t PubSubClient::writeNextMsgId(
size_t pos) {
747 if ((pos + 2) <= _bufferSize) {
748 _nextMsgId = (++_nextMsgId == 0) ? 1 : _nextMsgId;
749 _buffer[pos++] = (uint8_t)(_nextMsgId >> 8);
750 _buffer[pos++] = (uint8_t)(_nextMsgId & 0xFF);
752 ERROR_PSC_PRINTF_P(
"writeNextMsgId(): buffer overrun (%zu) \n", pos + 2);
763size_t PubSubClient::appendBuffer(uint8_t data) {
764 _buffer[_bufferWritePos++] = data;
765 if (_bufferWritePos >= _bufferSize) {
766 if (flushBuffer() == 0)
return 0;
777size_t PubSubClient::flushBuffer() {
780 rc = writeBuffer(0, _bufferWritePos);
794bool PubSubClient::subscribeImpl(
bool progmem,
const char* topic, uint8_t qos) {
795 if (!topic)
return false;
799 size_t topicLen = progmem ? strnlen_P(topic, _bufferSize) : strnlen(topic, _bufferSize);
800 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen + 1) {
806 uint16_t length = MQTT_MAX_HEADER_SIZE;
807 length = writeNextMsgId(length);
808 length = writeStringImpl(progmem, topic, length);
809 _buffer[length++] = qos;
810 return writeControlPacket(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(
MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
822bool PubSubClient::unsubscribeImpl(
bool progmem,
const char* topic) {
823 if (!topic)
return false;
826 size_t topicLen = progmem ? strnlen_P(topic, _bufferSize) : strnlen(topic, _bufferSize);
827 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen) {
832 uint16_t length = MQTT_MAX_HEADER_SIZE;
833 length = writeNextMsgId(length);
834 length = writeStringImpl(progmem, topic, length);
835 return writeControlPacket(MQTTUNSUBSCRIBE | MQTT_QOS_GET_HDR(
MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
841 IPAddress addr(ip[0], ip[1], ip[2], ip[3]);
854 char* newDomain =
nullptr;
856 newDomain = (
char*)realloc(_domain, strlen(domain) + 1);
859 strcpy(newDomain, domain);
871 this->callback = callback;
890 if (_bufferSize == 0) {
891 _buffer = (uint8_t*)malloc(size);
893 uint8_t* newBuffer = (uint8_t*)realloc(_buffer, size);
901 return (_buffer !=
nullptr);
909 _keepAliveMillis = keepAlive * 1000UL;
914 _socketTimeoutMillis = timeout * 1000UL;
A simple client for MQTT.
#define MQTT_SOCKET_TIMEOUT
Sets the timeout, in seconds, when reading from the network. This also applies as the timeout for cal...
#define MQTT_MAX_TRANSFER_SIZE
Sets the maximum number of bytes passed to the network client in each write call. Some hardware has a...
#define MQTT_VERSION
Sets the version of the MQTT protocol to use (3.1 or 3.1.1). [MQTT_VERSION_3_1, MQTT_VERSION_3_1_1].
#define MQTT_MAX_PACKET_SIZE
Sets the largest packet size, in bytes, the client will handle. Any packet received that exceeds this...
#define MQTT_KEEPALIVE
Sets the keepalive interval, in seconds, the client will use. This is used to maintain the connection...
bool loop()
This should be called regularly to allow the client to process incoming messages and maintain its con...
PubSubClient & setCallback(MQTT_CALLBACK_SIGNATURE)
Sets the message callback function.
PubSubClient & setServer(IPAddress ip, uint16_t port)
Sets the server details.
bool publish_P(const char *topic, PGM_P payload, bool retained)
Publishes a message stored in PROGMEM to the specified topic using QoS 0.
bool beginPublish(const char *topic, size_t plength, bool retained)
Start to publish a message using QoS 0. This API: beginPublish(...) one or more calls to write(....
virtual size_t write(uint8_t data)
Writes a single byte as a component of a publish started with a call to beginPublish....
bool publish(const char *topic, const char *payload)
Publishes a non retained message to the specified topic using QoS 0.
~PubSubClient()
Destructor for the PubSubClient class.
PubSubClient & setSocketTimeout(uint16_t timeout)
Sets the socket timeout used by the client. This determines how long the client will wait for incomin...
PubSubClient()
Creates an uninitialised client instance.
PubSubClient & setKeepAlive(uint16_t keepAlive)
Sets the keep alive interval used by the client. This value should only be changed when the client is...
bool connected()
Checks whether the client is connected to the server.
int state()
Returns the current state of the client. If a connection attempt fails, this can be used to get more ...
bool endPublish()
Finish sending a message that was started with a call to beginPublish.
size_t write_P(PGM_P string)
Writes a string in PROGMEM as a component of a publish started with a call to beginPublish....
PubSubClient & setClient(Client &client)
Sets the network client instance to use.
void disconnect()
Disconnects the client.
size_t getBufferSize()
Gets the current size of the internal buffer.
bool setBufferSize(size_t size)
Sets the size, in bytes, of the internal send and receive buffer. This must be large enough to contai...
bool connect(const char *id)
Connects the client using a clean session without username and password.
PubSubClient & setStream(Stream &stream)
Sets the stream to write received messages to.
#define MQTT_QOS0
Quality of Service 0: At most once.
#define MQTT_QOS1
Quality of Service 1: At least once.
#define MQTT_CONNECTION_TIMEOUT
The network connection timed out or server didn't respond within the keepalive time.
#define MQTT_CONNECTED
The client is connected.
#define MQTT_CONNECT_FAILED
The network connection failed.
#define MQTT_CONNECTION_LOST
The network connection was lost/broken.
#define MQTT_DISCONNECTED
The client is disconnected cleanly.