I have a simple class whose purpose is to read data over UDP all the time, and send a request packet every X seconds. I tried to implement scheduled request sending via boost::asio::deadline_timer, however my code started to behave strangely: it correctly reads incoming packets until the timer fires for the first time, then it stops reading and the socket only sends data. I confirmed that the problem is not on the side of the device with which the connection is being made.
Here's a summary of my code:
powerswitch.h:
#pragma once
/* includes */
namespace a
{
using std::thread;
using std::unique_ptr;
using std::array;
class PowerSwitch
{
struct [[gnu::packed]] RequestPacket { /* ... */ };
struct [[gnu::packed]] ResponsePacket { /* ... */ };
struct ChannelData { /* ... */ };
public:
explicit PowerSwitch(string_view ipv4, u16 port, boost::asio::io_context& context, std::chrono::seconds request_interval);
~PowerSwitch();
auto toggle_channel(int channel) -> void;
auto stop() -> void;
private:
auto configure(string_view ipv4, u16 port, std::chrono::seconds request_interval) -> expected<void, string>;
auto request() -> void;
auto read() -> void;
auto handle_incoming(usize bytes) -> void;
auto handle_timer() -> void;
auto write(string_view data) -> void;
private:
boost::asio::ip::udp::socket m_socket;
boost::asio::ip::udp::endpoint m_endpoint;
boost::asio::ip::udp::endpoint m_target;
boost::asio::deadline_timer m_timer;
std::chrono::seconds m_request_interval;
array<u8, 1024> m_buffer;
array<ChannelData, 8> m_channels;
};
}
powerswitch.cpp:
/* includes */
using std::span;
using std::vector;
namespace asio = boost::asio;
namespace a
{
constexpr u16 LOCAL_PORT = 12000;
constexpr u16 DUMMY_CHANNEL = 9'999;
constexpr auto REQUEST_MARKER = 0xAAAAAAAA;
PowerSwitch::PowerSwitch(
const string_view ipv4,
const u16 port,
asio::io_context& context,
const std::chrono::seconds request_interval
)
: m_socket(context, asio::ip::udp::endpoint()),
m_endpoint(this->m_socket.local_endpoint()),
m_timer(context, boost::posix_time::seconds(request_interval.count())),
m_request_interval(request_interval),
m_buffer(array<u8, 1024>()),
m_channels(array<ChannelData, 8>())
{
this->configure(ipv4, port, request_interval)
.map_error([](const auto& e){ llog::error("failed to initialize powerswitch: {}", e); });
}
PowerSwitch::~PowerSwitch() { this->stop(); }
auto PowerSwitch::toggle_channel(const int channel) -> void
{
const auto packet = RequestPacket {
.marker = REQUEST_MARKER,
.channel = static_cast<u16>(channel),
.response_port = this->m_endpoint.port(),
.checksum = 0x0000
};
this->write({ reinterpret_cast<const char*>(&packet), sizeof(packet) });
}
auto PowerSwitch::stop() -> void
{
this->m_socket.close();
llog::trace("closing connection to {}", this->m_endpoint.address().to_string());
}
auto PowerSwitch::configure(
const string_view ipv4,
const u16 port,
const std::chrono::seconds request_interval
) -> expected<void, string>
{
this->stop();
try {
this->m_endpoint = asio::ip::udp::endpoint(
asio::ip::make_address_v4(ip::Ipv4::local_address_unchecked().address),
LOCAL_PORT
);
llog::trace("opening socket at {}:{}", this->m_endpoint.address().to_string(), this->m_endpoint.port());
this->m_socket.open(this->m_endpoint.protocol());
this->m_socket.bind(this->m_endpoint);
this->m_request_interval = request_interval;
this->m_target = asio::ip::udp::endpoint(
asio::ip::make_address_v4(ipv4),
port
);
this->m_timer = asio::deadline_timer(
this->m_socket.get_executor(),
boost::posix_time::seconds(request_interval.count())
);
this->handle_timer();
llog::debug("powerswitch service started at {}:{} (receiving from {}:{})",
this->m_socket.local_endpoint().address().to_string(),
this->m_socket.local_endpoint().port(),
this->m_target.address().to_string(),
this->m_target.port()
);
} catch(const std::exception& e) {
return Err("exception: {}", e.what());
}
this->read();
return {};
}
auto PowerSwitch::request() -> void {
llog::trace("powerswitch: sending planned request");
this->toggle_channel(DUMMY_CHANNEL);
}
auto PowerSwitch::read() -> void
{
this->m_socket.async_receive_from(
asio::buffer(this->m_buffer),
this->m_endpoint,
[this](const auto& ec, const auto& bytes_transferred)
{
if(ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
this->handle_incoming(bytes_transferred);
this->read();
}
);
}
auto PowerSwitch::handle_incoming(const usize bytes) -> void
{
const auto raw = span(this->m_buffer).first(bytes);
for(const auto datagram = reinterpret_cast<array<ResponsePacket, 8>*>(raw.data());
const auto& [marker, channel, enabled, voltage, current] : *datagram) {
this->m_channels[channel] = ChannelData {
.voltage = static_cast<f32>(voltage) / 1'000.0f,
.current = static_cast<f32>(current),
.enabled = static_cast<bool>(enabled)
};
}
llog::trace("[:{} {}V {} mA]", this->m_channels.front().enabled, this->m_channels.front().voltage, this->m_channels.front().current);
}
auto PowerSwitch::handle_timer() -> void
{
this->request();
this->m_timer.expires_from_now(boost::posix_time::seconds(this->m_request_interval.count()));
this->m_timer.async_wait(
[this](const auto& ec)
{
if(ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
this->handle_timer();
}
);
}
auto PowerSwitch::write(const string_view data) -> void
{
this->m_socket.async_send_to(
asio::buffer(data),
this->m_target,
[this](const auto& ec, const auto& bytes_transferred)
{
if(ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
llog::trace("powerswitch: sent {} bytes to {}:{}",
bytes_transferred,
this->m_target.address().to_string(),
this->m_target.port()
);
}
);
this->read();
}
}
main.cpp:
auto context = boost::asio::io_context();
auto powerswitch = a::PowerSwitch(
"192.168.1.50",
44000,
context,
std::chrono::seconds(5)
);
context.run();
return 0;
I got the following output with code above:
[ trace ] [thread 7144 ]: opening socket at 192.168.1.10:12000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ debug ] [thread 7144 ]: powerswitch service started at 192.168.1.10:12000 (receiving from 192.168.1.50:44000)
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: [:false 11.961V 477 mA]
[ trace ] [thread 7144 ]: [:false 11.934V 493 mA]
[ trace ] [thread 7144 ]: [:false 11.974V 728 mA]
[ trace ] [thread 7144 ]: [:false 12.006V 543 mA]
[ trace ] [thread 7144 ]: [:false 12.004V 543 mA]
[ trace ] [thread 7144 ]: [:false 11.953V 692 mA]
[ trace ] [thread 7144 ]: [:false 11.959V 491 mA]
[ trace ] [thread 7144 ]: [:false 12.063V 583 mA]
[ trace ] [thread 7144 ]: [:false 11.833V 615 mA]
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: [:false 12.075V 613 mA]
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
I would be grateful for any help or advice.
There are many issues. Most of them due to over-complication.
The first big issue that caught my eye is that you
writethe local variablepacketin thetoggle_channelmethod. That leads to Undefined Behavior because the async write operation will use it after its lifetime has ended.To be honest, since UDP is fire-and-forget in nature, I don't see the need to use async operations here. You can just use the synchronous
send_tomethod and be done with it.Both your
read()loop andwrite()chain to moreread()operations, which means that you will end up with many read operations pending. This likely explains the problem that made you post the question.The
stop()in the destructor really has no use. It will only be reached after the iocontext has run out of work. That means that nothing is there to stop. What you might want is acancel()method that cancels all pending operations and then wait for them to complete.When you handle incoming message, you have to treat it as untrusted input. That means that you should validate it before using it. Most importantly, you should validate channel index is within bounds. Using
std::array::atinstead ofstd::array::operator[]will give you bounds checking.Simplifying
handle_incomingmethod might look like:Note that this presumes that you make sure the buffer is properly aligned for
ResponsePacket:Extending the checks a bit:
DEMO
Here's my attempt at fixing these things. I have made up some details to make it self-contained, and simplified many things along the way.
Live On Coliru
With a local demo: