20#define CHECK_STRING_LENGTH(l, s) \
21 if ((!s) || (l + 2 + strnlen(s, this->bufferSize) > this->bufferSize)) { \
114 return connect(
id,
nullptr,
nullptr,
nullptr,
MQTT_QOS0,
false,
nullptr,
true);
121bool PubSubClient::connect(
const char*
id,
const char* willTopic, uint8_t willQos,
bool willRetain,
const char* willMessage) {
122 return connect(
id,
nullptr,
nullptr, willTopic, willQos, willRetain, willMessage,
true);
125bool PubSubClient::connect(
const char*
id,
const char* user,
const char* pass,
const char* willTopic, uint8_t willQos,
bool willRetain,
126 const char* willMessage) {
127 return connect(
id, user, pass, willTopic, willQos, willRetain, willMessage,
true);
130bool PubSubClient::connect(
const char*
id,
const char* user,
const char* pass,
const char* willTopic, uint8_t willQos,
bool willRetain,
131 const char* willMessage,
bool cleanSession) {
135 if (_client->connected()) {
137 }
else if (this->port != 0) {
139 result = _client->connect(this->domain, this->port);
141 result = _client->connect(this->ip, this->port);
148#if MQTT_VERSION == MQTT_VERSION_3_1
149 const uint8_t protocol[9] = {0x00, 0x06,
'M',
'Q',
'I',
's',
'd',
'p',
MQTT_VERSION};
150#elif MQTT_VERSION == MQTT_VERSION_3_1_1
151 const uint8_t protocol[7] = {0x00, 0x04,
'M',
'Q',
'T',
'T',
MQTT_VERSION};
154 memcpy(this->buffer + MQTT_MAX_HEADER_SIZE, protocol,
sizeof(protocol));
156 size_t length = MQTT_MAX_HEADER_SIZE +
sizeof(protocol);
157 uint8_t flags = 0x00;
159 flags = (0x01 << 2) | (willQos << 3) | (willRetain << 5);
162 flags = flags | (0x01 << 1);
165 flags = flags | (0x01 << 7);
167 flags = flags | (0x01 << 6);
170 const uint16_t keepAlive = this->keepAliveMillis / 1000;
171 this->buffer[length++] = flags;
172 this->buffer[length++] = keepAlive >> 8;
173 this->buffer[length++] = keepAlive & 0xFF;
175 CHECK_STRING_LENGTH(length,
id)
176 length = writeString(
id, this->buffer, length, this->bufferSize);
178 CHECK_STRING_LENGTH(length, willTopic)
179 length = writeString(willTopic, this->buffer, length, this->bufferSize);
180 CHECK_STRING_LENGTH(length, willMessage)
181 length = writeString(willMessage, this->buffer, length, this->bufferSize);
185 CHECK_STRING_LENGTH(length, user)
186 length = writeString(user, this->buffer, length, this->bufferSize);
188 CHECK_STRING_LENGTH(length, pass)
189 length = writeString(pass, this->buffer, length, this->bufferSize);
193 write(MQTTCONNECT, this->buffer, length - MQTT_MAX_HEADER_SIZE);
195 lastInActivity = lastOutActivity = millis();
196 pingOutstanding =
false;
198 while (!_client->available()) {
200 unsigned long t = millis();
201 if (t - lastInActivity >= this->socketTimeoutMillis) {
202 DEBUG_PSC_PRINTF(
"connect aborting due to timeout\n");
209 size_t len = readPacket(&hdrLen);
212 if (buffer[3] == 0) {
213 lastInActivity = millis();
220 DEBUG_PSC_PRINTF(
"connect aborting due to protocol error\n");
231 if (!_client)
return false;
233 if (_client->connected()) {
236 DEBUG_PSC_PRINTF(
"lost connection (client may have more details)\n");
239 pingOutstanding =
false;
245 DEBUG_PSC_PRINTF(
"disconnect called\n");
246 this->buffer[0] = MQTTDISCONNECT;
248 _client->write(this->buffer, 2);
252 lastInActivity = lastOutActivity = millis();
253 pingOutstanding =
false;
262bool PubSubClient::readByte(uint8_t* result) {
263 unsigned long previousMillis = millis();
264 while (!_client->available()) {
266 unsigned long currentMillis = millis();
267 if (currentMillis - previousMillis >= this->socketTimeoutMillis) {
271 int rc = _client->read();
275 *result = (uint8_t)rc;
285bool PubSubClient::readByte(uint8_t* result,
size_t* pos) {
286 uint8_t* write_address = &(result[*pos]);
287 if (readByte(write_address)) {
300size_t PubSubClient::readPacket(uint8_t* hdrLen) {
302 if (!readByte(this->buffer, &len))
return 0;
303 bool isPublish = (this->buffer[0] & 0xF0) == MQTTPUBLISH;
304 uint32_t multiplier = 1;
311 if (len == MQTT_MAX_HEADER_SIZE) {
313 DEBUG_PSC_PRINTF(
"readPacket detected packet of invalid length\n");
318 if (!readByte(&digit))
return 0;
319 this->buffer[len++] = digit;
320 length += (digit & 0x7F) * multiplier;
322 }
while ((digit & 0x80) != 0);
323 *hdrLen = (uint8_t)(len - 1);
325 DEBUG_PSC_PRINTF(
"readPacket received packet of length %zu (isPublish = %u)\n", length, isPublish);
329 if (!readByte(this->buffer, &len))
return 0;
330 if (!readByte(this->buffer, &len))
return 0;
331 skip = (this->buffer[*hdrLen + 1] << 8) + this->buffer[*hdrLen + 2];
333 if (MQTT_HDR_GET_QOS(this->buffer[0]) >
MQTT_QOS0) {
340 for (
size_t i = start; i < length; i++) {
341 if (!readByte(&digit))
return 0;
343 if (isPublish && idx - *hdrLen - 2 > skip) {
344 this->stream->write(digit);
348 if (len < this->bufferSize) {
349 this->buffer[len++] = digit;
354 if (!this->stream && idx > this->bufferSize) {
355 DEBUG_PSC_PRINTF(
"readPacket ignoring packet of size %zu exceeding buffer of size %zu\n", length, this->bufferSize);
368bool PubSubClient::handlePacket(uint8_t hdrLen,
size_t length) {
369 uint8_t type = this->buffer[0] & 0xF0;
370 DEBUG_PSC_PRINTF(
"received message of type %u\n", type);
383 uint16_t topicLen = (this->buffer[hdrLen + 1] << 8) + this->buffer[hdrLen + 2];
384 char* topic = (
char*)(this->buffer + hdrLen + 3 - 1);
385 uint16_t payloadOffset = hdrLen + 3 + topicLen;
386 size_t payloadLen = length - payloadOffset;
387 uint8_t* payload = this->buffer + payloadOffset;
389 if (length < payloadOffset) {
390 ERROR_PSC_PRINTF_P(
"handlePacket(): Suspicious topicLen (%u) points outside of received buffer length (%zu)\n", topicLen, length);
393 memmove(topic, topic + 1, topicLen);
394 topic[topicLen] =
'\0';
396 if (MQTT_HDR_GET_QOS(this->buffer[0]) ==
MQTT_QOS0) {
398 callback(topic, payload, payloadLen);
401 if (payloadLen < 2) {
402 ERROR_PSC_PRINTF_P(
"handlePacket(): Missing msgId in QoS 1/2 message\n");
405 uint16_t msgId = (this->buffer[payloadOffset] << 8) + this->buffer[payloadOffset + 1];
406 callback(topic, payload + 2, payloadLen - 2);
408 this->buffer[0] = MQTTPUBACK;
410 this->buffer[2] = (msgId >> 8);
411 this->buffer[3] = (msgId & 0xFF);
412 if (_client->write(this->buffer, 4) == 4) {
413 lastOutActivity = millis();
421 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBACK packet with length %zu, expected at least 4 bytes\n", length);
429 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBREC packet with length %zu, expected at least 4 bytes\n", length);
433 buffer[0] = MQTTPUBREL | 2;
435 if (_client->write(buffer, 4) == 4) {
436 lastOutActivity = millis();
442 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBCOMP packet with length %zu, expected at least 4 bytes\n", length);
449 this->buffer[0] = MQTTPINGRESP;
451 if (_client->write(this->buffer, 2) == 2) {
452 lastOutActivity = millis();
456 pingOutstanding =
false;
469 const unsigned long t = millis();
470 if (keepAliveMillis && ((t - lastInActivity > this->keepAliveMillis) || (t - lastOutActivity > this->keepAliveMillis))) {
471 if (pingOutstanding) {
472 DEBUG_PSC_PRINTF(
"loop aborting due to timeout\n");
475 pingOutstanding =
false;
478 this->buffer[0] = MQTTPINGREQ;
480 if (_client->write(this->buffer, 2) == 2) {
481 lastInActivity = lastOutActivity = t;
482 pingOutstanding =
true;
486 if (_client->available()) {
488 size_t len = readPacket(&hdrLen);
491 ret = handlePacket(hdrLen, len);
526 size_t rc = write(payload, plength);
527 lastOutActivity = millis();
548 for (
size_t i = 0; i < plength; i++) {
549 rc += _client->write((uint8_t)pgm_read_byte_near(payload + i));
551 lastOutActivity = millis();
562 if (!topic)
return false;
563 if (strlen(topic) == 0)
return false;
566 ERROR_PSC_PRINTF_P(
"beginPublish() called with invalid QoS %u\n", qos);
571 if (
connected() && MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 <= this->bufferSize) {
573 size_t topicLen = writeString(topic, this->buffer, MQTT_MAX_HEADER_SIZE, this->bufferSize) - MQTT_MAX_HEADER_SIZE;
575 const uint8_t header = MQTTPUBLISH | MQTT_QOS_GET_HDR(qos) | (retained ? MQTTRETAINED : 0);
576 const size_t nextMsgLen = (qos) ? 2 : 0;
577 uint8_t hdrLen = buildHeader(header, this->buffer, topicLen + plength + nextMsgLen);
578 if (hdrLen == 0)
return false;
580 size_t rc = _client->write(this->buffer + (MQTT_MAX_HEADER_SIZE - hdrLen), hdrLen + topicLen);
581 lastOutActivity = millis();
582 return (rc == (hdrLen + topicLen));
592 writeNextMsgId(buf, 0, 2);
593 size_t rc = _client->write(buf, 2);
594 lastOutActivity = millis();
613uint8_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf,
size_t length) {
614 uint8_t hdrBuf[MQTT_MAX_HEADER_SIZE - 1];
624 hdrBuf[hdrLen++] = digit;
625 }
while (len > 0 && hdrLen < MQTT_MAX_HEADER_SIZE - 1);
628 ERROR_PSC_PRINTF_P(
"buildHeader() length too big %zu, left %zu\n", length, len);
632 buf[MQTT_MAX_HEADER_SIZE - 1 - hdrLen] = header;
633 memcpy(buf + MQTT_MAX_HEADER_SIZE - hdrLen, hdrBuf, hdrLen);
637size_t PubSubClient::write(uint8_t data) {
638 lastOutActivity = millis();
639 return _client->write(data);
642size_t PubSubClient::write(
const uint8_t* buffer,
size_t size) {
643 lastOutActivity = millis();
644 return _client->write(buffer, size);
655bool PubSubClient::write(uint8_t header, uint8_t* buf,
size_t length) {
658 uint8_t hdrLen = buildHeader(header, buf, length);
659 if (hdrLen == 0)
return false;
661#ifdef MQTT_MAX_TRANSFER_SIZE
662 uint8_t* writeBuf = buf + (MQTT_MAX_HEADER_SIZE - hdrLen);
663 size_t bytesRemaining = length + hdrLen;
665 while ((bytesRemaining > 0) && result) {
668 rc = _client->write(writeBuf, bytesToWrite);
669 result = (rc == bytesToWrite);
670 bytesRemaining -= rc;
673 lastOutActivity = millis();
677 rc = _client->write(buf + (MQTT_MAX_HEADER_SIZE - hdrLen), length + hdrLen);
678 result = (rc == length + hdrLen);
680 lastOutActivity = millis();
698size_t PubSubClient::writeString(
const char*
string, uint8_t* buf,
size_t pos,
size_t size) {
699 if (!
string)
return pos;
701 size_t sLen = strlen(
string);
702 if (pos + 2 + sLen <= size && sLen <= 0xFFFF) {
703 buf[pos++] = (uint8_t)(sLen >> 8);
704 buf[pos++] = (uint8_t)(sLen & 0xFF);
705 memcpy(buf + pos,
string, sLen);
708 ERROR_PSC_PRINTF_P(
"writeString(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, size);
722size_t PubSubClient::writeNextMsgId(uint8_t* buf,
size_t pos,
size_t size) {
723 if (pos + 2 <= size) {
724 nextMsgId = (++nextMsgId == 0) ? 1 : nextMsgId;
725 buf[pos++] = (uint8_t)(nextMsgId >> 8);
726 buf[pos++] = (uint8_t)(nextMsgId & 0xFF);
728 ERROR_PSC_PRINTF_P(
"writeNextMsgId(): buffer (%zu) does not fit into buf (%zu)\n", pos + 2, size);
738 if (!topic)
return false;
741 size_t topicLen = strnlen(topic, this->bufferSize);
742 if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen + 1) {
748 uint16_t length = MQTT_MAX_HEADER_SIZE;
749 length = writeNextMsgId(buffer, length, this->bufferSize);
750 length = writeString(topic, this->buffer, length, this->bufferSize);
751 this->buffer[length++] = qos;
752 return write(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(
MQTT_QOS1), this->buffer, length - MQTT_MAX_HEADER_SIZE);
758 if (!topic)
return false;
760 size_t topicLen = strnlen(topic, this->bufferSize);
761 if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen) {
766 uint16_t length = MQTT_MAX_HEADER_SIZE;
767 length = writeNextMsgId(buffer, length, this->bufferSize);
768 length = writeString(topic, this->buffer, length, this->bufferSize);
769 return write(MQTTUNSUBSCRIBE | MQTT_QOS_GET_HDR(
MQTT_QOS1), this->buffer, length - MQTT_MAX_HEADER_SIZE);
775 IPAddress addr(ip[0], ip[1], ip[2], ip[3]);
783 this->domain =
nullptr;
788 char* newDomain =
nullptr;
790 newDomain = (
char*)realloc(this->domain, strlen(domain) + 1);
793 strcpy(newDomain, domain);
794 this->domain = newDomain;
798 this->domain =
nullptr;
805 this->callback = callback;
810 this->_client = &client;
815 this->stream = &stream;
824 if (this->bufferSize == 0) {
825 this->buffer = (uint8_t*)malloc(size);
827 uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
829 this->buffer = newBuffer;
834 this->bufferSize = size;
835 return (this->buffer !=
nullptr);
839 return this->bufferSize;
843 this->keepAliveMillis = keepAlive * 1000UL;
848 this->socketTimeoutMillis = timeout * 1000UL;
A simple client for MQTT.
#define MQTT_SOCKET_TIMEOUT
Sets the timeout, in seconds, when reading from the network. This also applies as the timeout for cal...
#define MQTT_MAX_TRANSFER_SIZE
Sets the maximum number of bytes passed to the network client in each write call. Some hardware has a...
#define MQTT_VERSION
Sets the version of the MQTT protocol to use (3.1 or 3.1.1). [MQTT_VERSION_3_1, MQTT_VERSION_3_1_1].
#define MQTT_MAX_POSSIBLE_PACKET_SIZE
Maximum packet size defined by MQTT protocol.
#define MQTT_MAX_PACKET_SIZE
Sets the largest packet size, in bytes, the client will handle. Any packet received that exceeds this...
#define MQTT_KEEPALIVE
Sets the keepalive interval, in seconds, the client will use. This is used to maintain the connection...
bool loop()
This should be called regularly to allow the client to process incoming messages and maintain its con...
bool subscribe(const char *topic)
Subscribes to messages published to the specified topic using QoS 0.
PubSubClient & setCallback(MQTT_CALLBACK_SIGNATURE)
Sets the message callback function.
PubSubClient & setServer(IPAddress ip, uint16_t port)
Sets the server details.
bool beginPublish(const char *topic, size_t plength, bool retained)
Start to publish a message using QoS 0. This API: beginPublish(...) one or more calls to write(....
bool unsubscribe(const char *topic)
Unsubscribes from the specified topic.
bool publish_P(const char *topic, const char *payload, bool retained)
Publishes a message stored in PROGMEM to the specified topic using QoS 0.
bool publish(const char *topic, const char *payload)
Publishes a non retained message to the specified topic using QoS 0.
~PubSubClient()
Destructor for the PubSubClient class.
PubSubClient & setSocketTimeout(uint16_t timeout)
Sets the socket timeout used by the client. This determines how long the client will wait for incomin...
PubSubClient()
Creates an uninitialised client instance.
PubSubClient & setKeepAlive(uint16_t keepAlive)
Sets the keep alive interval used by the client. This value should only be changed when the client is...
bool publish(const char *topic, const char *payload, bool retained)
Publishes a message to the specified topic using QoS 0.
bool connected()
Checks whether the client is connected to the server.
int state()
Returns the current state of the client. If a connection attempt fails, this can be used to get more ...
bool endPublish()
Finish sending a message that was started with a call to beginPublish.
PubSubClient & setClient(Client &client)
Sets the network client instance to use.
void disconnect()
Disconnects the client.
size_t getBufferSize()
Gets the current size of the internal buffer.
bool setBufferSize(size_t size)
Sets the size, in bytes, of the internal send and receive buffer. This must be large enough to contai...
bool connect(const char *id)
Connects the client using a clean session without username and password.
PubSubClient & setStream(Stream &stream)
Sets the stream to write received messages to.
#define MQTT_QOS0
Quality of Service 0: At most once.
#define MQTT_QOS1
Quality of Service 1: At least once.
#define MQTT_CONNECTION_TIMEOUT
The network connection timed out or server didn't respond within the keepalive time.
#define MQTT_CONNECTED
The client is connected.
#define MQTT_CONNECT_FAILED
The network connection failed.
#define MQTT_CONNECTION_LOST
The network connection was lost/broken.
#define MQTT_DISCONNECTED
The client is disconnected cleanly.