20#define CHECK_STRING_LENGTH(l, s) \
21 if ((!s) || (l + 2 + strnlen(s, _bufferSize) > _bufferSize)) { \
114 return connect(
id,
nullptr,
nullptr,
nullptr,
MQTT_QOS0,
false,
nullptr,
true);
121bool PubSubClient::connect(
const char*
id,
const char* willTopic, uint8_t willQos,
bool willRetain,
const char* willMessage) {
122 return connect(
id,
nullptr,
nullptr, willTopic, willQos, willRetain, willMessage,
true);
125bool PubSubClient::connect(
const char*
id,
const char* user,
const char* pass,
const char* willTopic, uint8_t willQos,
bool willRetain,
126 const char* willMessage) {
127 return connect(
id, user, pass, willTopic, willQos, willRetain, willMessage,
true);
130bool PubSubClient::connect(
const char*
id,
const char* user,
const char* pass,
const char* willTopic, uint8_t willQos,
bool willRetain,
131 const char* willMessage,
bool cleanSession) {
132 if (!_client)
return false;
136 if (_client->connected()) {
138 }
else if (_port != 0) {
140 result = _client->connect(_domain, _port);
142 result = _client->connect(_ip, _port);
149#if MQTT_VERSION == MQTT_VERSION_3_1
150 const uint8_t protocol[9] = {0x00, 0x06,
'M',
'Q',
'I',
's',
'd',
'p',
MQTT_VERSION};
151#elif MQTT_VERSION == MQTT_VERSION_3_1_1
152 const uint8_t protocol[7] = {0x00, 0x04,
'M',
'Q',
'T',
'T',
MQTT_VERSION};
155 memcpy(_buffer + MQTT_MAX_HEADER_SIZE, protocol,
sizeof(protocol));
157 size_t length = MQTT_MAX_HEADER_SIZE +
sizeof(protocol);
158 uint8_t flags = 0x00;
160 flags = (0x01 << 2) | (willQos << 3) | (willRetain << 5);
163 flags = flags | (0x01 << 1);
166 flags = flags | (0x01 << 7);
168 flags = flags | (0x01 << 6);
171 const uint16_t keepAlive = _keepAliveMillis / 1000;
172 _buffer[length++] = flags;
173 _buffer[length++] = keepAlive >> 8;
174 _buffer[length++] = keepAlive & 0xFF;
176 CHECK_STRING_LENGTH(length,
id)
177 length = writeString(
id, length);
179 CHECK_STRING_LENGTH(length, willTopic)
180 length = writeString(willTopic, length);
181 CHECK_STRING_LENGTH(length, willMessage)
182 length = writeString(willMessage, length);
186 CHECK_STRING_LENGTH(length, user)
187 length = writeString(user, length);
189 CHECK_STRING_LENGTH(length, pass)
190 length = writeString(pass, length);
194 if (!writeControlPacket(MQTTCONNECT, length - MQTT_MAX_HEADER_SIZE)) {
199 _lastInActivity = _lastOutActivity = millis();
200 _pingOutstanding =
false;
202 while (!_client->available()) {
204 unsigned long t = millis();
205 if (t - _lastInActivity >= _socketTimeoutMillis) {
206 DEBUG_PSC_PRINTF(
"connect aborting due to timeout\n");
213 size_t len = readPacket(&hdrLen);
216 if (_buffer[3] == 0) {
217 _lastInActivity = millis();
224 DEBUG_PSC_PRINTF(
"connect aborting due to protocol error\n");
235 if (!_client)
return false;
237 if (_client->connected()) {
240 DEBUG_PSC_PRINTF(
"lost connection (client may have more details)\n");
243 _pingOutstanding =
false;
249 DEBUG_PSC_PRINTF(
"disconnect called\n");
252 _buffer[0] = MQTTDISCONNECT;
254 _client->write(_buffer, 2);
257 _lastInActivity = _lastOutActivity = millis();
259 _pingOutstanding =
false;
268bool PubSubClient::readByte(uint8_t* result) {
269 if (!_client)
return false;
271 unsigned long previousMillis = millis();
272 while (!_client->available()) {
274 unsigned long currentMillis = millis();
275 if (currentMillis - previousMillis >= _socketTimeoutMillis) {
279 int rc = _client->read();
283 *result = (uint8_t)rc;
293bool PubSubClient::readByte(uint8_t* result,
size_t* pos) {
294 uint8_t* write_address = &(result[*pos]);
295 if (readByte(write_address)) {
308size_t PubSubClient::readPacket(uint8_t* hdrLen) {
310 if (!readByte(_buffer, &len))
return 0;
311 bool isPublish = (_buffer[0] & 0xF0) == MQTTPUBLISH;
312 uint32_t multiplier = 1;
319 if (len == MQTT_MAX_HEADER_SIZE) {
321 DEBUG_PSC_PRINTF(
"readPacket detected packet of invalid length\n");
326 if (!readByte(&digit))
return 0;
327 _buffer[len++] = digit;
328 length += (digit & 0x7F) * multiplier;
330 }
while ((digit & 0x80) != 0);
331 *hdrLen = (uint8_t)(len - 1);
333 DEBUG_PSC_PRINTF(
"readPacket received packet of length %zu (isPublish = %u)\n", length, isPublish);
337 if (!readByte(_buffer, &len))
return 0;
338 if (!readByte(_buffer, &len))
return 0;
339 skip = (_buffer[*hdrLen + 1] << 8) + _buffer[*hdrLen + 2];
341 if (MQTT_HDR_GET_QOS(_buffer[0]) >
MQTT_QOS0) {
348 for (
size_t i = start; i < length; i++) {
349 if (!readByte(&digit))
return 0;
351 if (isPublish && (idx - *hdrLen - 2 > skip)) {
352 _stream->write(digit);
356 if (len < _bufferSize) {
357 _buffer[len++] = digit;
362 if (!_stream && (idx > _bufferSize)) {
363 DEBUG_PSC_PRINTF(
"readPacket ignoring packet of size %zu exceeding buffer of size %zu\n", length, _bufferSize);
376bool PubSubClient::handlePacket(uint8_t hdrLen,
size_t length) {
377 uint8_t type = _buffer[0] & 0xF0;
378 DEBUG_PSC_PRINTF(
"received message of type %u\n", type);
391 uint16_t topicLen = (_buffer[hdrLen + 1] << 8) + _buffer[hdrLen + 2];
392 char* topic = (
char*)(_buffer + hdrLen + 3 - 1);
393 uint16_t payloadOffset = hdrLen + 3 + topicLen;
394 size_t payloadLen = length - payloadOffset;
395 uint8_t* payload = _buffer + payloadOffset;
397 if (length < payloadOffset) {
398 ERROR_PSC_PRINTF_P(
"handlePacket(): Suspicious topicLen (%u) points outside of received buffer length (%zu)\n", topicLen, length);
401 memmove(topic, topic + 1, topicLen);
402 topic[topicLen] =
'\0';
404 if (MQTT_HDR_GET_QOS(_buffer[0]) ==
MQTT_QOS0) {
406 callback(topic, payload, payloadLen);
409 if (payloadLen < 2) {
410 ERROR_PSC_PRINTF_P(
"handlePacket(): Missing msgId in QoS 1/2 message\n");
413 uint16_t msgId = (_buffer[payloadOffset] << 8) + _buffer[payloadOffset + 1];
414 callback(topic, payload + 2, payloadLen - 2);
416 _buffer[0] = MQTTPUBACK;
418 _buffer[2] = (msgId >> 8);
419 _buffer[3] = (msgId & 0xFF);
420 if (_client->write(_buffer, 4) == 4) {
421 _lastOutActivity = millis();
429 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBACK packet with length %zu, expected at least 4 bytes\n", length);
437 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBREC packet with length %zu, expected at least 4 bytes\n", length);
441 _buffer[0] = MQTTPUBREL | 2;
443 if (_client->write(_buffer, 4) == 4) {
444 _lastOutActivity = millis();
450 ERROR_PSC_PRINTF_P(
"handlePacket(): Received PUBCOMP packet with length %zu, expected at least 4 bytes\n", length);
457 _buffer[0] = MQTTPINGRESP;
459 if (_client->write(_buffer, 2) == 2) {
460 _lastOutActivity = millis();
464 _pingOutstanding =
false;
477 const unsigned long t = millis();
478 if (_keepAliveMillis && ((t - _lastInActivity > _keepAliveMillis) || (t - _lastOutActivity > _keepAliveMillis))) {
479 if (_pingOutstanding) {
480 DEBUG_PSC_PRINTF(
"loop aborting due to timeout\n");
483 _pingOutstanding =
false;
485 }
else if (_bufferWritePos > 0) {
487 if (flushBuffer() == 0) {
490 _pingOutstanding =
false;
494 _buffer[0] = MQTTPINGREQ;
496 if (_client->write(_buffer, 2) == 2) {
497 _lastInActivity = _lastOutActivity = t;
498 _pingOutstanding =
true;
502 if (_client->available()) {
504 size_t len = readPacket(&hdrLen);
507 ret = handlePacket(hdrLen, len);
542 size_t rc =
write(payload, plength);
562 size_t rc =
write_P(payload, plength);
573 if (!topic)
return false;
574 if (strlen(topic) == 0)
return false;
576 ERROR_PSC_PRINTF_P(
"beginPublish() called with invalid QoS %u\n", qos);
579 const size_t nextMsgLen = (qos >
MQTT_QOS0) ? 2 : 0;
581 if (
connected() && (MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 + nextMsgLen <= _bufferSize)) {
583 size_t topicLen = writeString(topic, MQTT_MAX_HEADER_SIZE) - MQTT_MAX_HEADER_SIZE;
586 writeNextMsgId(MQTT_MAX_HEADER_SIZE + topicLen);
589 const uint8_t header = MQTTPUBLISH | MQTT_QOS_GET_HDR(qos) | (retained ? MQTTRETAINED : 0);
590 uint8_t hdrLen = buildHeader(header, topicLen + nextMsgLen + plength);
591 if (hdrLen == 0)
return false;
593 size_t rc = _client->write(_buffer + (MQTT_MAX_HEADER_SIZE - hdrLen), hdrLen + topicLen + nextMsgLen);
594 _lastOutActivity = millis();
595 return (rc == (hdrLen + topicLen + nextMsgLen));
602 if (_bufferWritePos > 0) {
604 if (flushBuffer() == 0)
return false;
620uint8_t PubSubClient::buildHeader(uint8_t header,
size_t length) {
621 uint8_t hdrBuf[MQTT_MAX_HEADER_SIZE - 1];
631 hdrBuf[hdrLen++] = digit;
632 }
while ((len > 0) && (hdrLen < MQTT_MAX_HEADER_SIZE - 1));
635 ERROR_PSC_PRINTF_P(
"buildHeader() length too big %zu, left %zu\n", length, len);
639 _buffer[MQTT_MAX_HEADER_SIZE - 1 - hdrLen] = header;
640 memcpy(_buffer + MQTT_MAX_HEADER_SIZE - hdrLen, hdrBuf, hdrLen);
645 return appendBuffer(data);
649 for (
size_t i = 0; i < size; i++) {
650 if (appendBuffer(buf[i]) == 0)
return i;
656 for (
size_t i = 0; i < size; i++) {
657 if (appendBuffer((uint8_t)pgm_read_byte_near(buf + i)) == 0)
return i;
670bool PubSubClient::writeControlPacket(uint8_t header,
size_t length) {
671 uint8_t hdrLen = buildHeader(header, length);
672 if (hdrLen == 0)
return false;
674 return writeBuffer(MQTT_MAX_HEADER_SIZE - hdrLen, hdrLen + length);
684size_t PubSubClient::writeBuffer(
size_t pos,
size_t size) {
686 if (_client && (size > 0) && (pos + size <= _bufferSize)) {
687#ifdef MQTT_MAX_TRANSFER_SIZE
688 uint8_t* writeBuf = _buffer + pos;
689 size_t bytesRemaining = size;
691 while ((bytesRemaining > 0) && result) {
693 size_t bytesWritten = _client->write(writeBuf, bytesToWrite);
694 result = (bytesWritten == bytesToWrite);
695 bytesRemaining -= bytesWritten;
696 writeBuf += bytesWritten;
698 _lastOutActivity = millis();
702 rc = result ? size : 0;
704 rc = _client->write(_buffer + pos, size);
706 _lastOutActivity = millis();
725size_t PubSubClient::writeString(
const char*
string,
size_t pos) {
726 if (!
string)
return pos;
728 size_t sLen = strlen(
string);
729 if ((pos + 2 + sLen <= _bufferSize) && (sLen <= 0xFFFF)) {
730 _buffer[pos++] = (uint8_t)(sLen >> 8);
731 _buffer[pos++] = (uint8_t)(sLen & 0xFF);
732 memcpy(_buffer + pos,
string, sLen);
735 ERROR_PSC_PRINTF_P(
"writeString(): string (%zu) does not fit into buf (%zu)\n", pos + 2 + sLen, _bufferSize);
747size_t PubSubClient::writeNextMsgId(
size_t pos) {
748 if ((pos + 2) <= _bufferSize) {
749 _nextMsgId = (++_nextMsgId == 0) ? 1 : _nextMsgId;
750 _buffer[pos++] = (uint8_t)(_nextMsgId >> 8);
751 _buffer[pos++] = (uint8_t)(_nextMsgId & 0xFF);
753 ERROR_PSC_PRINTF_P(
"writeNextMsgId(): buffer overrun (%zu) \n", pos + 2);
764size_t PubSubClient::appendBuffer(uint8_t data) {
765 _buffer[_bufferWritePos++] = data;
766 if (_bufferWritePos >= _bufferSize) {
767 if (flushBuffer() == 0)
return 0;
778size_t PubSubClient::flushBuffer() {
781 rc = writeBuffer(0, _bufferWritePos);
792 if (!topic)
return false;
795 size_t topicLen = strnlen(topic, _bufferSize);
796 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen + 1) {
802 uint16_t length = MQTT_MAX_HEADER_SIZE;
803 length = writeNextMsgId(length);
804 length = writeString(topic, length);
805 _buffer[length++] = qos;
806 return writeControlPacket(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(
MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
812 if (!topic)
return false;
814 size_t topicLen = strnlen(topic, _bufferSize);
815 if (_bufferSize < MQTT_MAX_HEADER_SIZE + 2 + 2 + topicLen) {
820 uint16_t length = MQTT_MAX_HEADER_SIZE;
821 length = writeNextMsgId(length);
822 length = writeString(topic, length);
823 return writeControlPacket(MQTTUNSUBSCRIBE | MQTT_QOS_GET_HDR(
MQTT_QOS1), length - MQTT_MAX_HEADER_SIZE);
829 IPAddress addr(ip[0], ip[1], ip[2], ip[3]);
842 char* newDomain =
nullptr;
844 newDomain = (
char*)realloc(_domain, strlen(domain) + 1);
847 strcpy(newDomain, domain);
859 this->callback = callback;
878 if (_bufferSize == 0) {
879 _buffer = (uint8_t*)malloc(size);
881 uint8_t* newBuffer = (uint8_t*)realloc(_buffer, size);
889 return (_buffer !=
nullptr);
897 _keepAliveMillis = keepAlive * 1000UL;
902 _socketTimeoutMillis = timeout * 1000UL;
A simple client for MQTT.
#define MQTT_SOCKET_TIMEOUT
Sets the timeout, in seconds, when reading from the network. This also applies as the timeout for cal...
#define MQTT_MAX_TRANSFER_SIZE
Sets the maximum number of bytes passed to the network client in each write call. Some hardware has a...
#define MQTT_VERSION
Sets the version of the MQTT protocol to use (3.1 or 3.1.1). [MQTT_VERSION_3_1, MQTT_VERSION_3_1_1].
#define MQTT_MAX_POSSIBLE_PACKET_SIZE
Maximum packet size defined by MQTT protocol.
#define MQTT_MAX_PACKET_SIZE
Sets the largest packet size, in bytes, the client will handle. Any packet received that exceeds this...
#define MQTT_KEEPALIVE
Sets the keepalive interval, in seconds, the client will use. This is used to maintain the connection...
bool loop()
This should be called regularly to allow the client to process incoming messages and maintain its con...
bool subscribe(const char *topic)
Subscribes to messages published to the specified topic using QoS 0.
PubSubClient & setCallback(MQTT_CALLBACK_SIGNATURE)
Sets the message callback function.
PubSubClient & setServer(IPAddress ip, uint16_t port)
Sets the server details.
bool beginPublish(const char *topic, size_t plength, bool retained)
Start to publish a message using QoS 0. This API: beginPublish(...) one or more calls to write(....
bool unsubscribe(const char *topic)
Unsubscribes from the specified topic.
bool publish_P(const char *topic, const char *payload, bool retained)
Publishes a message stored in PROGMEM to the specified topic using QoS 0.
virtual size_t write(uint8_t data)
Writes a single byte as a component of a publish started with a call to beginPublish....
bool publish(const char *topic, const char *payload)
Publishes a non retained message to the specified topic using QoS 0.
~PubSubClient()
Destructor for the PubSubClient class.
PubSubClient & setSocketTimeout(uint16_t timeout)
Sets the socket timeout used by the client. This determines how long the client will wait for incomin...
PubSubClient()
Creates an uninitialised client instance.
PubSubClient & setKeepAlive(uint16_t keepAlive)
Sets the keep alive interval used by the client. This value should only be changed when the client is...
bool connected()
Checks whether the client is connected to the server.
int state()
Returns the current state of the client. If a connection attempt fails, this can be used to get more ...
bool endPublish()
Finish sending a message that was started with a call to beginPublish.
PubSubClient & setClient(Client &client)
Sets the network client instance to use.
void disconnect()
Disconnects the client.
size_t getBufferSize()
Gets the current size of the internal buffer.
size_t write_P(const uint8_t *buf, size_t size)
Writes an array of progmem bytes as a component of a publish started with a call to beginPublish....
bool setBufferSize(size_t size)
Sets the size, in bytes, of the internal send and receive buffer. This must be large enough to contai...
bool connect(const char *id)
Connects the client using a clean session without username and password.
PubSubClient & setStream(Stream &stream)
Sets the stream to write received messages to.
#define MQTT_QOS0
Quality of Service 0: At most once.
#define MQTT_QOS1
Quality of Service 1: At least once.
#define MQTT_CONNECTION_TIMEOUT
The network connection timed out or server didn't respond within the keepalive time.
#define MQTT_CONNECTED
The client is connected.
#define MQTT_CONNECT_FAILED
The network connection failed.
#define MQTT_CONNECTION_LOST
The network connection was lost/broken.
#define MQTT_DISCONNECTED
The client is disconnected cleanly.