112 m_bgThread = std::thread(&StreamOutletPrivate::run,
this);
122 if (m_bgThread.joinable()) {
133 std::lock_guard<std::mutex> lock(m_queueMutex);
134 m_sampleQueue.push(sample);
143 std::lock_guard<std::mutex> lock(m_queueMutex);
144 for (
const auto& sample : chunk) {
145 m_sampleQueue.push(sample);
164 return m_nClients.load() > 0;
180 QTcpServer tcpServer;
181 if (!tcpServer.listen(QHostAddress::Any, 0)) {
182 qDebug() <<
"[lsl::stream_outlet] Failed to start TCP server:" << tcpServer.errorString();
190 QUdpSocket udpSocket;
192 udpSocket.setSocketOption(QAbstractSocket::MulticastTtlOption, 1);
197 QByteArray handshake(
"LSL1", 4);
199 handshake.append(
reinterpret_cast<const char*
>(&ch),
sizeof(
int));
202 std::string discoveryPayload = m_info.
to_string();
205 std::vector<QTcpSocket*> clients;
207 auto lastBroadcast = std::chrono::steady_clock::now() - std::chrono::seconds(10);
212 while (tcpServer.waitForNewConnection(0)) {
213 QTcpSocket* client = tcpServer.nextPendingConnection();
216 client->write(handshake);
218 clients.push_back(client);
219 m_nClients.store(
static_cast<int>(clients.size()));
224 auto now = std::chrono::steady_clock::now();
225 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastBroadcast).count();
226 if (elapsed >= BROADCAST_INTERVAL_MS) {
227 QByteArray datagram(discoveryPayload.c_str(),
static_cast<int>(discoveryPayload.size()));
228 udpSocket.writeDatagram(datagram, DISCOVERY_MULTICAST_GROUP, DISCOVERY_PORT);
234 std::lock_guard<std::mutex> lock(m_queueMutex);
235 while (!m_sampleQueue.empty()) {
236 const std::vector<float>& sample = m_sampleQueue.front();
237 QByteArray sampleData(
reinterpret_cast<const char*
>(sample.data()),
238 static_cast<int>(sample.size() *
sizeof(
float)));
241 auto it = clients.begin();
242 while (it != clients.end()) {
243 QTcpSocket* client = *it;
244 if (client->state() != QAbstractSocket::ConnectedState) {
246 it = clients.erase(it);
249 client->write(sampleData);
258 for (QTcpSocket* client : clients) {
262 m_nClients.store(
static_cast<int>(clients.size()));
265 std::this_thread::sleep_for(std::chrono::milliseconds(1));
269 for (QTcpSocket* client : clients) {
270 client->disconnectFromHost();
281 std::atomic<bool> m_bRunning;
282 std::thread m_bgThread;
284 std::mutex m_queueMutex;
285 std::queue<std::vector<float>> m_sampleQueue;
286 std::atomic<int> m_nClients{0};