MNE-CPP  0.1.9
A Framework for Electrophysiology
eventsharedmemmanager.cpp
1 //=============================================================================================================
36 //=============================================================================================================
37 // STD INCLUDES
38 //=============================================================================================================
39 
40 #include <utility>
41 
42 //=============================================================================================================
43 // Qt INCLUDES
44 //=============================================================================================================
45 
46 #include <QDebug>
47 #include <QString>
48 
49 //=============================================================================================================
50 // MNECPP INCLUDES
51 //=============================================================================================================
52 
53 #include "eventsharedmemmanager.h"
54 #include "eventmanager.h"
55 
56 //=============================================================================================================
57 // NAMESPACE SPEC
58 //=============================================================================================================
59 
60 using namespace EVENTSLIB;
61 
62 //=============================================================================================================
63 // LOCAL DEFINITIONS
64 //=============================================================================================================
65 
66 static const std::string defaultSharedMemoryBufferKey("MNE_EVENTS_SHAREDMEMORY_BUFFER");
67 static const std::string defaultGroupName("external");
68 
69 int EVENTSINTERNAL::EventSharedMemManager::m_iLastUpdateIndex(0);
70 
71 // The limiting factor in the bandwitdh of the shared memory capabilities of this library
72 // is measured in terms of buffer length divided by the time interval between checks for updates.
73 // So, in order to say: The library is capable of correctly handle a
74 // maximum of "sharedMemBufferLength"/"m_fTimerCheckBuffer" events per milisecond.
75 constexpr static int bufferLength(5);
76 static long long defatult_timerBufferWatch(200);
77 
78 //=============================================================================================================
79 
81 :EventUpdate(0,0,EventUpdateType::NULL_EVENT)
82 { }
83 
84 //=============================================================================================================
85 
87 : m_EventSample(sample)
88 , m_CreatorId(creator)
89 , m_TypeOfUpdate(t)
90 {
92 }
93 
94 //=============================================================================================================
95 
97 {
98  return m_CreationTime;
99 }
100 
101 //=============================================================================================================
102 
104 {
105  return m_EventSample;
106 }
107 
108 //=============================================================================================================
109 
111 {
112  return m_CreatorId;
113 }
114 
115 //=============================================================================================================
116 
118 {
119  return m_TypeOfUpdate;
120 }
121 
122 //=============================================================================================================
123 
125 {
126  m_TypeOfUpdate = t;
127 }
128 
129 //=============================================================================================================
130 
132 {
133  return EVENTSINTERNAL::EventUpdateTypeString[m_TypeOfUpdate];
134 }
135 
136 //=============================================================================================================
137 
139 : m_pEventManager(parent)
140 , m_SharedMemory(QString::fromStdString(defaultSharedMemoryBufferKey))
141 , m_IsInit(false)
142 , m_sGroupName(defaultGroupName)
143 , m_bGroupCreated(false)
144 , m_GroupId(0)
145 , m_SharedMemorySize(sizeof(int) + bufferLength * sizeof(EventUpdate))
146 , m_fTimerCheckBuffer(defatult_timerBufferWatch)
147 , m_BufferWatcherThreadRunning(false)
148 , m_WritingToSharedMemory(false)
149 , m_lastCheckTime(0)
150 , m_LocalBuffer(new EventUpdate[bufferLength])
151 , m_SharedBuffer(nullptr)
152 , m_Id(generateId())
153 , m_Mode(EVENTSLIB::SharedMemoryMode::READ)
154 {
155 
156 }
157 
158 //=============================================================================================================
159 
160 EVENTSINTERNAL::EventSharedMemManager::~EventSharedMemManager()
161 {
162  detachFromSharedMemory();
163  delete[] m_LocalBuffer;
164 }
165 
166 //=============================================================================================================
167 
168 void EVENTSINTERNAL::EventSharedMemManager::init(EVENTSLIB::SharedMemoryMode mode)
169 {
170 // qDebug() << " ========================================================";
171 // qDebug() << "Init started!\n";
172 
173  if(!m_IsInit)
174  {
175  detachFromSharedMemory();
176 
177  m_Mode = mode;
178  if(m_Mode == EVENTSLIB::SharedMemoryMode::READ)
179  {
180  attachToSharedSegment(QSharedMemory::AccessMode::ReadOnly);
181  launchSharedMemoryWatcherThread();
182 
183  } else if(m_Mode == EVENTSLIB::SharedMemoryMode::WRITE)
184  {
185  attachToOrCreateSharedSegment( QSharedMemory::AccessMode::ReadWrite);
186  } else if(m_Mode == EVENTSLIB::SharedMemoryMode::READWRITE)
187  {
188  attachToOrCreateSharedSegment( QSharedMemory::AccessMode::ReadWrite);
189  launchSharedMemoryWatcherThread();
190  }
191  }
192 }
193 
194 //=============================================================================================================
195 
196 void EVENTSINTERNAL::EventSharedMemManager::attachToOrCreateSharedSegment(QSharedMemory::AccessMode mode)
197 {
198  attachToSharedSegment(mode);
199  if(!m_IsInit)
200  {
201  m_IsInit = createSharedSegment(m_SharedMemorySize, mode);
202  }
203 }
204 
205 //=============================================================================================================
206 
207 void EVENTSINTERNAL::EventSharedMemManager::attachToSharedSegment(QSharedMemory::AccessMode mode)
208 {
209  m_IsInit = m_SharedMemory.attach(mode);
210  if(m_IsInit)
211  {
212  m_SharedBuffer = static_cast<EventUpdate*>(m_SharedMemory.data());
213  }
214 }
215 
216 //=============================================================================================================
217 
218 bool EVENTSINTERNAL::EventSharedMemManager::createSharedSegment(int bufferSize, QSharedMemory::AccessMode mode)
219 {
220  bool output = m_SharedMemory.create(bufferSize, mode);
221  if(output)
222  {
223  m_SharedBuffer = static_cast<EventUpdate*>(m_SharedMemory.data());
224  initializeSharedMemory();
225  }
226  return output;
227 }
228 
229 //=============================================================================================================
230 
231 void EVENTSINTERNAL::EventSharedMemManager::launchSharedMemoryWatcherThread()
232 {
233  m_BufferWatcherThread = std::thread(&EventSharedMemManager::bufferWatcher, this);
234 }
235 
236 //=============================================================================================================
237 
238 void EVENTSINTERNAL::EventSharedMemManager::detachFromSharedMemory()
239 {
240  stopSharedMemoryWatcherThread();
241  if(!m_BufferWatcherThreadRunning && !m_WritingToSharedMemory)
242  {
243  if(m_SharedMemory.isAttached())
244  {
245  m_SharedMemory.detach();
246  }
247  }
248 }
249 
250 //=============================================================================================================
251 
252 void EVENTSINTERNAL::EventSharedMemManager::stopSharedMemoryWatcherThread()
253 {
254  if(m_BufferWatcherThreadRunning)
255  {
256  m_IsInit = false;
257  m_BufferWatcherThread.join();
258  }
259 }
260 
261 //=============================================================================================================
262 
264 {
265  detachFromSharedMemory();
266  m_IsInit = false;
267 }
268 
269 //=============================================================================================================
270 
272 {
273  return m_IsInit;
274 }
275 
276 //=============================================================================================================
277 
279 {
280  if(m_IsInit &&
281  (m_Mode == EVENTSLIB::SharedMemoryMode::WRITE ||
282  m_Mode == EVENTSLIB::SharedMemoryMode::READWRITE ) )
283  {
284  EventUpdate newUpdate(sample, m_Id, EventUpdateType::NEW_EVENT);
285  copyNewUpdateToSharedMemory(newUpdate);
286  }
287 }
288 
289 //=============================================================================================================
290 
292 {
293  if(m_IsInit &&
294  (m_Mode == EVENTSLIB::SharedMemoryMode::WRITE ||
295  m_Mode == EVENTSLIB::SharedMemoryMode::READWRITE ) )
296  {
297  EventUpdate newUpdate(sample, m_Id, EventUpdateType::DELETE_EVENT);
298  copyNewUpdateToSharedMemory(newUpdate);
299  }
300 }
301 
302 //=============================================================================================================
303 
304 void EVENTSINTERNAL::EventSharedMemManager::initializeSharedMemory()
305 {
306 // qDebug() << "Initializing Shared Memory Buffer ======== id: " << m_Id;
307 // printLocalBuffer();
308  void* localBuffer = static_cast<void*>(m_LocalBuffer);
309  char* sharedBuffer = static_cast<char*>(m_SharedMemory.data()) + sizeof(int);
310  int indexIterator(0);
311  m_WritingToSharedMemory = true;
312  if(m_SharedMemory.isAttached())
313  {
314  m_SharedMemory.lock();
315  memcpy(m_SharedMemory.data(), &indexIterator, sizeof(int));
316  memcpy(sharedBuffer, localBuffer, bufferLength * sizeof(EventUpdate));
317  m_SharedMemory.unlock();
318  }
319  m_WritingToSharedMemory = false;
320 }
321 
322 //=============================================================================================================
323 
324 void EVENTSINTERNAL::EventSharedMemManager::copyNewUpdateToSharedMemory(EventUpdate& newUpdate)
325 {
326 // qDebug() << "Sending Buffer ======== id: " << m_Id;
327 
328  char* sharedBuffer = static_cast<char*>(m_SharedMemory.data()) + sizeof(int);
329  int indexIterator(0);
330  m_WritingToSharedMemory = true;
331  if(m_SharedMemory.isAttached())
332  {
333  m_SharedMemory.lock();
334  memcpy(&indexIterator, m_SharedMemory.data(), sizeof(int));
335  memcpy(m_SharedMemory.data(), &(++indexIterator), sizeof(int));
336  int index = (indexIterator-1) % bufferLength;
337  memcpy(sharedBuffer + (index * sizeof(EventUpdate)), static_cast<void*>(&newUpdate), sizeof(EventUpdate));
338  m_SharedMemory.unlock();
339  }
340  m_WritingToSharedMemory = false;
341 }
342 
343 //=============================================================================================================
344 
345 void EVENTSINTERNAL::EventSharedMemManager::copySharedMemoryToLocalBuffer()
346 {
347  void* localBuffer = static_cast<void*>(m_LocalBuffer);
348  char* sharedBuffer = static_cast<char*>(m_SharedMemory.data()) + sizeof(int);
349  if(m_SharedMemory.isAttached())
350  {
351  m_SharedMemory.lock();
352  memcpy(localBuffer, sharedBuffer, bufferLength * sizeof(EventUpdate));
353  m_SharedMemory.unlock();
354  }
355 // qDebug() << "Receiving Buffer ======== id: " << m_Id;
356 // printLocalBuffer();
357 }
358 
359 //=============================================================================================================
360 
361 void EVENTSINTERNAL::EventSharedMemManager::bufferWatcher()
362 {
363  m_BufferWatcherThreadRunning = true;
364 // qDebug() << "buffer Watcher thread launched";
365  while(m_IsInit)
366  {
367 // qDebug() << "Running buffer watcher!";
368  copySharedMemoryToLocalBuffer();
369  auto timeCheck = getTimeNow();
370  processLocalBuffer();
371  m_lastCheckTime = timeCheck;
372  std::this_thread::sleep_for(std::chrono::milliseconds(m_fTimerCheckBuffer));
373  }
374  m_BufferWatcherThreadRunning = false;
375 }
376 
377 //=============================================================================================================
378 
379 void EVENTSINTERNAL::EventSharedMemManager::processLocalBuffer()
380 {
381  for(int i = 0; i < bufferLength; ++i)
382  {
383 // qDebug() << "Checking update: " << i;
384  if(m_LocalBuffer[i].getCreationTime() > m_lastCheckTime &&
385  m_LocalBuffer[i].getCreatorId() != m_Id )
386  {
387  createGroupIfNeeded();
388  processEvent(m_LocalBuffer[i]);
389  }
390  }
391 }
392 
393 //=============================================================================================================
394 
396 {
397 // qDebug() << "process new update";
398  switch (ne.getType())
399  {
400  case EventUpdateType::NEW_EVENT :
401  {
402  processNewEvent(ne);
403  break;
404  }
405  case EventUpdateType::DELETE_EVENT :
406  {
407  processDeleteEvent(ne);
408  break;
409  }
410  default :
411  break;
412  }
413 }
414 
415 //=============================================================================================================
416 
417 void EVENTSINTERNAL::EventSharedMemManager::processNewEvent(const EventUpdate& ne)
418 {
419  EVENTSINTERNAL::EventINT newEvent(
420  m_pEventManager->generateNewEventId(), ne.getSample(), m_GroupId);
421  m_pEventManager->insertEvent(newEvent);
422 }
423 
424 //=============================================================================================================
425 
426 void EVENTSINTERNAL::EventSharedMemManager::processDeleteEvent(const EventUpdate& ne)
427 {
428  auto eventsInSample = m_pEventManager->getEventsInSample(ne.getSample());
429  for(auto e: *eventsInSample)
430  {
431  if(e.groupId == m_GroupId)
432  {
433  m_pEventManager->eraseEvent(e.id);
434  break;
435  };
436  }
437 }
438 
439 //=============================================================================================================
440 
442 {
443  const auto tNow = std::chrono::system_clock::now();
444  return std::chrono::duration_cast<std::chrono::milliseconds>(
445  tNow.time_since_epoch()).count();
446 }
447 
448 //=============================================================================================================
449 
450 void EVENTSINTERNAL::EventSharedMemManager::createGroupIfNeeded()
451 {
452  if(!m_bGroupCreated)
453  {
454  EVENTSLIB::EventGroup g = m_pEventManager->addGroup(m_sGroupName);
455  m_GroupId = g.id;
456  m_bGroupCreated = true;
457  }
458 }
459 
460 //=============================================================================================================
461 
462 void EVENTSINTERNAL::EventSharedMemManager::printLocalBuffer()
463 {
464  for(int i = 0; i < bufferLength; ++i)
465  {
466  qDebug() << "[" << i << "] -" << m_LocalBuffer[i].eventTypeToText().c_str()
467  << "-" << m_LocalBuffer[i].getSample()
468  << "-" << m_LocalBuffer[i].getCreatorId()
469  << "-" << m_LocalBuffer[i].getCreationTime() << "\n";
470  }
471 }
472 
473 //=============================================================================================================
std::unique_ptr< std::vector< Event > > getEventsInSample(int sample) const
EventSharedMemManager definition.
EventManager declaration.
EventGroup addGroup(const std::string &sGroupName)
EventSharedMemManager(EVENTSLIB::EventManager *parent=nullptr)
EventGroup class is designed as a data holder for a group. It is designed towards ease of use for a c...
Definition: eventgroup.h:116