v2.0.0
Loading...
Searching...
No Matches
stream_outlet.cpp
Go to the documentation of this file.
1//=============================================================================================================
34
35//=============================================================================================================
36// INCLUDES
37//=============================================================================================================
38
39#include "stream_outlet.h"
40
41//=============================================================================================================
42// QT INCLUDES
43//=============================================================================================================
44
45#include <QTcpServer>
46#include <QTcpSocket>
47#include <QUdpSocket>
48#include <QHostAddress>
49#include <QDebug>
50
51//=============================================================================================================
52// STL INCLUDES
53//=============================================================================================================
54
55#include <thread>
56#include <mutex>
57#include <atomic>
58#include <cstring>
59#include <queue>
60#include <chrono>
61
62//=============================================================================================================
63// USED NAMESPACES
64//=============================================================================================================
65
66using namespace LSLLIB;
67
68//=============================================================================================================
69// CONSTANTS
70//=============================================================================================================
71
72namespace {
73 const QHostAddress DISCOVERY_MULTICAST_GROUP("239.255.172.215");
74 const quint16 DISCOVERY_PORT = 16571;
75 const int BROADCAST_INTERVAL_MS = 500;
76}
77
78//=============================================================================================================
79// PRIVATE IMPLEMENTATION
80//=============================================================================================================
81
92{
93public:
95 : m_info(info)
96 , m_bRunning(false)
97 {
98 }
99
101 {
102 stop();
103 }
104
105 //=========================================================================================================
109 void start()
110 {
111 m_bRunning = true;
112 m_bgThread = std::thread(&StreamOutletPrivate::run, this);
113 }
114
115 //=========================================================================================================
119 void stop()
120 {
121 m_bRunning = false;
122 if (m_bgThread.joinable()) {
123 m_bgThread.join();
124 }
125 }
126
127 //=========================================================================================================
131 void enqueueSample(const std::vector<float>& sample)
132 {
133 std::lock_guard<std::mutex> lock(m_queueMutex);
134 m_sampleQueue.push(sample);
135 }
136
137 //=========================================================================================================
141 void enqueueChunk(const std::vector<std::vector<float>>& chunk)
142 {
143 std::lock_guard<std::mutex> lock(m_queueMutex);
144 for (const auto& sample : chunk) {
145 m_sampleQueue.push(sample);
146 }
147 }
148
149 //=========================================================================================================
154 {
155 return m_info;
156 }
157
158 //=========================================================================================================
162 bool haveConsumers() const
163 {
164 return m_nClients.load() > 0;
165 }
166
167private:
168 //=========================================================================================================
177 void run()
178 {
179 // --- Set up TCP server ---
180 QTcpServer tcpServer;
181 if (!tcpServer.listen(QHostAddress::Any, 0)) {
182 qDebug() << "[lsl::stream_outlet] Failed to start TCP server:" << tcpServer.errorString();
183 return;
184 }
185
186 // Record the assigned port into stream_info
187 m_info.set_data_port(tcpServer.serverPort());
188
189 // --- Set up UDP socket for multicast discovery ---
190 QUdpSocket udpSocket;
191#ifndef Q_OS_WASM
192 udpSocket.setSocketOption(QAbstractSocket::MulticastTtlOption, 1);
193#endif
194
195 // --- Prepare the handshake header ---
196 // "LSL1" (4 bytes) + channel_count (4 bytes, little-endian int)
197 QByteArray handshake("LSL1", 4);
198 int ch = m_info.channel_count();
199 handshake.append(reinterpret_cast<const char*>(&ch), sizeof(int));
200
201 // Prepare discovery datagram (re-created each loop to include up-to-date port)
202 std::string discoveryPayload = m_info.to_string();
203
204 // Track connected client sockets (owned by this thread)
205 std::vector<QTcpSocket*> clients;
206
207 auto lastBroadcast = std::chrono::steady_clock::now() - std::chrono::seconds(10); // send immediately
208
209 // --- Main loop ---
210 while (m_bRunning) {
211 // 1. Accept new connections
212 while (tcpServer.waitForNewConnection(0)) {
213 QTcpSocket* client = tcpServer.nextPendingConnection();
214 if (client) {
215 // Send handshake to the new client
216 client->write(handshake);
217 client->flush();
218 clients.push_back(client);
219 m_nClients.store(static_cast<int>(clients.size()));
220 }
221 }
222
223 // 2. Send UDP discovery broadcast (every BROADCAST_INTERVAL_MS)
224 auto now = std::chrono::steady_clock::now();
225 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastBroadcast).count();
226 if (elapsed >= BROADCAST_INTERVAL_MS) {
227 QByteArray datagram(discoveryPayload.c_str(), static_cast<int>(discoveryPayload.size()));
228 udpSocket.writeDatagram(datagram, DISCOVERY_MULTICAST_GROUP, DISCOVERY_PORT);
229 lastBroadcast = now;
230 }
231
232 // 3. Drain sample queue and write to all clients
233 {
234 std::lock_guard<std::mutex> lock(m_queueMutex);
235 while (!m_sampleQueue.empty()) {
236 const std::vector<float>& sample = m_sampleQueue.front();
237 QByteArray sampleData(reinterpret_cast<const char*>(sample.data()),
238 static_cast<int>(sample.size() * sizeof(float)));
239
240 // Write to all clients, remove disconnected ones
241 auto it = clients.begin();
242 while (it != clients.end()) {
243 QTcpSocket* client = *it;
244 if (client->state() != QAbstractSocket::ConnectedState) {
245 delete client;
246 it = clients.erase(it);
247 continue;
248 }
249 client->write(sampleData);
250 ++it;
251 }
252
253 m_sampleQueue.pop();
254 }
255 }
256
257 // Flush all clients
258 for (QTcpSocket* client : clients) {
259 client->flush();
260 }
261
262 m_nClients.store(static_cast<int>(clients.size()));
263
264 // Sleep briefly to avoid busy-waiting
265 std::this_thread::sleep_for(std::chrono::milliseconds(1));
266 }
267
268 // --- Cleanup ---
269 for (QTcpSocket* client : clients) {
270 client->disconnectFromHost();
271 delete client;
272 }
273 clients.clear();
274 m_nClients.store(0);
275
276 tcpServer.close();
277 udpSocket.close();
278 }
279
280 stream_info m_info;
281 std::atomic<bool> m_bRunning;
282 std::thread m_bgThread;
283
284 std::mutex m_queueMutex;
285 std::queue<std::vector<float>> m_sampleQueue;
286 std::atomic<int> m_nClients{0};
287};
288
289//=============================================================================================================
290// DEFINE MEMBER METHODS
291//=============================================================================================================
292
294: m_pImpl(new StreamOutletPrivate(info))
295{
296 m_pImpl->start();
297}
298
299//=============================================================================================================
300
302{
303 // unique_ptr destructor handles cleanup via StreamOutletPrivate destructor
304}
305
306//=============================================================================================================
307
308void stream_outlet::push_sample(const std::vector<float>& sample)
309{
310 m_pImpl->enqueueSample(sample);
311}
312
313//=============================================================================================================
314
315void stream_outlet::push_chunk(const std::vector<std::vector<float>>& chunk)
316{
317 m_pImpl->enqueueChunk(chunk);
318}
319
320//=============================================================================================================
321
323{
324 return m_pImpl->info();
325}
326
327//=============================================================================================================
328
330{
331 return m_pImpl->haveConsumers();
332}
Contains the declaration of the stream_outlet class.
Lab Streaming Layer (LSL) integration for real-time data exchange.
Describes a particular stream on the network.
Definition stream_info.h:82
std::string to_string() const
Serialize stream_info into a string for network transport.
int channel_count() const
Number of channels.
void set_data_port(int port)
Set the TCP data port (used internally during discovery / outlet creation).
StreamOutletPrivate(const stream_info &info)
void enqueueSample(const std::vector< float > &sample)
void enqueueChunk(const std::vector< std::vector< float > > &chunk)
void push_sample(const std::vector< float > &sample)
stream_outlet(const stream_info &info)
void push_chunk(const std::vector< std::vector< float > > &chunk)
stream_info info() const