v2.0.0
Loading...
Searching...
No Matches
stream_inlet.cpp
Go to the documentation of this file.
1//=============================================================================================================
34
35//=============================================================================================================
36// INCLUDES
37//=============================================================================================================
38
39#include "stream_inlet.h"
40
41//=============================================================================================================
42// QT INCLUDES
43//=============================================================================================================
44
45#include <QTcpSocket>
46#include <QByteArray>
47#include <QDebug>
48
49//=============================================================================================================
50// STL INCLUDES
51//=============================================================================================================
52
53#include <cstring>
54#include <stdexcept>
55
56//=============================================================================================================
57// USED NAMESPACES
58//=============================================================================================================
59
60using namespace LSLLIB;
61
62//=============================================================================================================
63// PRIVATE IMPLEMENTATION
64//=============================================================================================================
65
71{
72public:
74 : m_info(info)
75 , m_pSocket(nullptr)
76 , m_bIsOpen(false)
77 , m_iChannelCount(info.channel_count())
78 , m_iBytesPerSample(static_cast<int>(info.channel_count() * sizeof(float)))
79 {
80 }
81
86
87 //=========================================================================================================
92 {
93 if (m_bIsOpen) {
94 return;
95 }
96
97 // Create socket (no parent, we manage lifetime ourselves)
98 m_pSocket = new QTcpSocket();
99
100 QString host = QString::fromStdString(m_info.data_host());
101 quint16 port = static_cast<quint16>(m_info.data_port());
102
103 if (host.isEmpty() || port == 0) {
104 delete m_pSocket;
105 m_pSocket = nullptr;
106 throw std::runtime_error("[lsl::stream_inlet] Invalid data host or port in stream_info");
107 }
108
109 m_pSocket->connectToHost(host, port);
110
111 if (!m_pSocket->waitForConnected(5000)) {
112 QString err = m_pSocket->errorString();
113 delete m_pSocket;
114 m_pSocket = nullptr;
115 throw std::runtime_error(std::string("[lsl::stream_inlet] Failed to connect to outlet: ")
116 + err.toStdString());
117 }
118
119 // Read the handshake header: "LSL1" (4 bytes) + channel_count (4 bytes, little-endian)
120 if (!m_pSocket->waitForReadyRead(5000)) {
121 delete m_pSocket;
122 m_pSocket = nullptr;
123 throw std::runtime_error("[lsl::stream_inlet] Timeout waiting for handshake from outlet");
124 }
125
126 QByteArray header;
127 while (header.size() < 8) {
128 if (m_pSocket->bytesAvailable() == 0) {
129 if (!m_pSocket->waitForReadyRead(5000)) {
130 break;
131 }
132 }
133 header.append(m_pSocket->read(8 - header.size()));
134 }
135
136 if (header.size() < 8 || header.left(4) != QByteArray("LSL1", 4)) {
137 delete m_pSocket;
138 m_pSocket = nullptr;
139 throw std::runtime_error("[lsl::stream_inlet] Invalid handshake from outlet");
140 }
141
142 // Read channel count from header (little-endian int32)
143 int headerChannels = 0;
144 std::memcpy(&headerChannels, header.constData() + 4, sizeof(int));
145 if (headerChannels != m_iChannelCount) {
146 qDebug() << "[lsl::stream_inlet] Warning: outlet reports" << headerChannels
147 << "channels, expected" << m_iChannelCount << "- using outlet value";
148 m_iChannelCount = headerChannels;
149 m_iBytesPerSample = m_iChannelCount * static_cast<int>(sizeof(float));
150 }
151
152 m_bIsOpen = true;
153 }
154
155 //=========================================================================================================
160 {
161 if (m_pSocket) {
162 if (m_pSocket->state() == QAbstractSocket::ConnectedState) {
163 m_pSocket->disconnectFromHost();
164 if (m_pSocket->state() != QAbstractSocket::UnconnectedState) {
165 m_pSocket->waitForDisconnected(1000);
166 }
167 }
168 delete m_pSocket;
169 m_pSocket = nullptr;
170 }
171 m_bIsOpen = false;
172 m_rawBuffer.clear();
173 }
174
175 //=========================================================================================================
182 {
183 if (!m_bIsOpen || !m_pSocket) {
184 return false;
185 }
186
187 // Non-blocking check for available data
188 if (m_pSocket->bytesAvailable() > 0 || m_pSocket->waitForReadyRead(0)) {
189 QByteArray data = m_pSocket->readAll();
190 m_rawBuffer.append(data);
191 }
192
193 // A complete sample requires m_iBytesPerSample bytes
194 return (m_iBytesPerSample > 0) && (m_rawBuffer.size() >= m_iBytesPerSample);
195 }
196
197 //=========================================================================================================
203 std::vector<std::vector<float>> pullChunkFloat()
204 {
205 std::vector<std::vector<float>> chunk;
206
207 if (!m_bIsOpen || !m_pSocket) {
208 return chunk;
209 }
210
211 // Read any pending data
212 readPending();
213
214 if (m_iBytesPerSample <= 0) {
215 return chunk;
216 }
217
218 // Extract complete samples from the buffer
219 int nCompleteSamples = m_rawBuffer.size() / m_iBytesPerSample;
220 if (nCompleteSamples == 0) {
221 return chunk;
222 }
223
224 chunk.reserve(nCompleteSamples);
225 const char* ptr = m_rawBuffer.constData();
226
227 for (int s = 0; s < nCompleteSamples; ++s) {
228 std::vector<float> sample(m_iChannelCount);
229 std::memcpy(sample.data(), ptr + s * m_iBytesPerSample, m_iBytesPerSample);
230 chunk.push_back(std::move(sample));
231 }
232
233 // Remove consumed bytes from the buffer
234 int consumedBytes = nCompleteSamples * m_iBytesPerSample;
235 m_rawBuffer.remove(0, consumedBytes);
236
237 return chunk;
238 }
239
241 QTcpSocket* m_pSocket;
245 QByteArray m_rawBuffer;
246};
247
248//=============================================================================================================
249// DEFINE MEMBER METHODS
250//=============================================================================================================
251
253: m_pImpl(new StreamInletPrivate(info))
254{
255}
256
257//=============================================================================================================
258
260{
261 // unique_ptr destructor handles cleanup via StreamInletPrivate destructor
262}
263
264//=============================================================================================================
265
267{
268 m_pImpl->openStream();
269}
270
271//=============================================================================================================
272
274{
275 m_pImpl->closeStream();
276}
277
278//=============================================================================================================
279
281{
282 return m_pImpl->readPending();
283}
284
285//=============================================================================================================
286
287std::vector<std::vector<float>> stream_inlet::pull_chunk_float()
288{
289 return m_pImpl->pullChunkFloat();
290}
Contains the declaration of the stream_inlet class.
Lab Streaming Layer (LSL) integration for real-time data exchange.
Describes a particular stream on the network.
Definition stream_info.h:82
StreamInletPrivate(const stream_info &info)
std::vector< std::vector< float > > pullChunkFloat()
stream_inlet(const stream_info &info)
std::vector< std::vector< float > > pull_chunk_float()