Audio Processing Framework (APF) version 0.5.0
mimoprocessor.h
Go to the documentation of this file.
1/******************************************************************************
2 Copyright (c) 2012-2016 Institut für Nachrichtentechnik, Universität Rostock
3 Copyright (c) 2006-2012 Quality & Usability Lab
4 Deutsche Telekom Laboratories, TU Berlin
5
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12
13 The above copyright notice and this permission notice shall be included in
14 all copies or substantial portions of the Software.
15
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 THE SOFTWARE.
23*******************************************************************************/
24
25// https://AudioProcessingFramework.github.io/
26
29
30#ifndef APF_MIMOPROCESSOR_H
31#define APF_MIMOPROCESSOR_H
32
33#include <cassert> // for assert()
34#include <mutex>
35#include <stdexcept> // for std::logic_error
36
37#include "apf/rtlist.h"
38#include "apf/parameter_map.h"
39#include "apf/misc.h" // for NonCopyable
40#include "apf/iterator.h" // for *_iterator, make_*_iterator(), cast_proxy_const
41#include "apf/container.h" // for fixed_vector
42#include "apf/threadtools.h" // for ScopedThread
43
44#define APF_MIMOPROCESSOR_TEMPLATES template<typename Derived, typename interface_policy, typename query_policy>
45#define APF_MIMOPROCESSOR_BASE MimoProcessor<Derived, interface_policy, query_policy>
46
68#define APF_PROCESS(name, parent) \
69struct Process : parent::Process { \
70 explicit Process(name& ctor_arg) : parent::Process(ctor_arg) { \
71 ctor_arg.APF_PROCESS_internal(); } }; \
72void APF_PROCESS_internal()
73
74namespace apf
75{
76
77// the default implementation does nothing
78template<typename interface_policy, typename native_handle_type>
79struct thread_traits
80{
81 static void update_priority(const interface_policy&, native_handle_type) noexcept {}
82};
83
84class enable_queries
85{
86 protected:
87 enable_queries()
88 // NB: 1 should be sufficient, but it doesn't seem to work:
89 : _query_fifo(2)
90 {}
91
92 template<typename F>
93 void start_query_thread(F& query_function, int usleeptime)
94 {
95 _query_thread = std::make_unique<QueryThread>(_query_fifo, usleeptime);
96
97 // CAUTION: this must be called after interface_policy::activate()!
98 // If not, an infinite recursion happens!
99 _new_query(query_function);
100 }
101
102 // This is supposed to be called from the audio thread
103 void process_query_commands()
104 {
105 _query_fifo.process_commands();
106 }
107
108 void stop_query_thread()
109 {
110 _keep_running = false;
111 // Stop query thread:
112 _query_thread.reset();
113 // Empty query fifo in case there is still some cleanup to do:
114 _query_fifo.deactivate();
115 }
116
117 private:
118 class CleanupFunction
119 {
120 public:
121 CleanupFunction(CommandQueue& fifo) : _fifo(fifo) {};
122 void operator()() { _fifo.cleanup_commands(); }
123
124 private:
125 CommandQueue& _fifo;
126 };
127
128 struct QueryThread : ScopedThread<CleanupFunction>
129 {
130 QueryThread(CommandQueue& fifo, int usleeptime)
131 : ScopedThread<CleanupFunction>(CleanupFunction(fifo), usleeptime)
132 {}
133 };
134
135 template<typename F>
136 class QueryCommand : public CommandQueue::Command
137 {
138 public:
139 QueryCommand(F& query_function, enable_queries& parent)
140 : _query_function(query_function)
141 , _parent(parent)
142 {}
143
144 // This is called in the audio thread
145 virtual void execute()
146 {
147 _query_function.query();
148 }
149
150 // This is called in the query thread
151 virtual void cleanup()
152 {
153 if (_parent._keep_running)
154 {
155 _query_function.update();
156 _parent._new_query(_query_function); // "recursive" call!
157 }
158 }
159
160 private:
161 F& _query_function;
162 enable_queries& _parent;
163 };
164
165 template<typename F>
166 void _new_query(F& query_function)
167 {
168 _query_fifo.push(new QueryCommand<F>(query_function, *this));
169 }
170
171 std::atomic<bool> _keep_running{true};
172 std::unique_ptr<QueryThread> _query_thread;
173 CommandQueue _query_fifo;
174};
175
176class disable_queries
177{
178 protected:
179 void process_query_commands() {}
180 void stop_query_thread() {}
181};
182
195template<typename Derived
196 , typename interface_policy
197 , typename query_policy = disable_queries>
198class MimoProcessor : public interface_policy
199 , public query_policy
200 , public CRTP<Derived>
202{
203 public:
204 using typename interface_policy::sample_type;
205
206 class Input;
207 class Output;
208 class DefaultInput;
209 class DefaultOutput;
210
213 {
214 virtual ~Item() = default;
215
217 virtual void process() = 0;
218 };
219
235 template<typename X>
236 struct ProcessItem : Item, public CRTP<X>
237 {
238 struct Process { explicit Process(ProcessItem&) {} };
239
240 virtual void process()
241 {
242 typename X::Process(this->derived());
243 }
244 };
245
246 using rtlist_t = RtList<Item*>;
247
251 template<typename T>
252 struct rtlist_proxy : cast_proxy_const<T, rtlist_t>
253 {
254 rtlist_proxy(const rtlist_t& l) : cast_proxy_const<T, rtlist_t>(l) {}
255 };
256
259 {
260 public:
261 explicit ScopedLock(std::mutex& obj)
262 : _obj(obj)
263 {
264 _obj.lock();
265 }
266
267 ~ScopedLock() { _obj.unlock(); }
268
269 private:
270 std::mutex& _obj;
271 };
272
273 bool activate()
274 {
275 _fifo.reactivate(); // no return value
276 return interface_policy::activate();
277 }
278
280 template<typename F>
281 bool activate(F& query_function, int usleeptime)
282 {
283 auto result = this->activate();
284 this->start_query_thread(query_function, usleeptime);
285 return result;
286 }
287
288 bool deactivate()
289 {
290 this->stop_query_thread();
291
292 if (!interface_policy::deactivate()) return false;
293
294 // All audio threads should be stopped now.
295
296 // Inputs/Outputs push commands in their destructors -> we need a loop.
297 do
298 {
299 // Exceptionally, this is called from the non-realtime thread:
300 _fifo.process_commands();
301 _fifo.cleanup_commands();
302 }
303 while (_fifo.commands_available());
304 // The queue should be empty now.
305 if (!_fifo.deactivate()) throw std::logic_error("Bug: FIFO not empty!");
306 return true;
307
308 // The lists can now be manipulated safely from the non-realtime thread.
309 }
310
311 void wait_for_rt_thread() { _fifo.wait(); }
312
313 template<typename X>
314 X* add()
315 {
316 return this->add(typename X::Params());
317 }
318
319 // TODO: find a way to get the outer type automatically
320 template<typename P>
321 typename P::outer* add(const P& p)
322 {
323 using X = typename P::outer;
324 auto temp = p;
325 temp.parent = &this->derived();
326 return static_cast<X*>(_add_helper(new X(temp)));
327 }
328
329 void rem(Input* in) { _input_list.rem(in); }
330 void rem(Output* out) { _output_list.rem(out); }
331
332 const rtlist_t& get_input_list() const { return _input_list; }
333 const rtlist_t& get_output_list() const { return _output_list; }
334
335 unsigned threads() const { return _num_threads; }
336
337 // TODO: make private?
338 const parameter_map params;
339
340 protected:
341 using MimoProcessorBase = APF_MIMOPROCESSOR_BASE;
342
343 using rtlist_iterator = typename rtlist_t::iterator;
344 using rtlist_const_iterator = typename rtlist_t::const_iterator;
345
346 struct Process { Process(Derived&) {} };
347
348 explicit MimoProcessor(const parameter_map& params = parameter_map());
349
352 {
353 this->deactivate();
354 _input_list.clear();
355 _output_list.clear();
356 }
357
358 void _process_list(rtlist_t& l);
359 void _process_list(rtlist_t& l1, rtlist_t& l2);
360
361 CommandQueue _fifo;
362
363 private:
364 class WorkerThread : NonCopyable
365 {
366 public:
367 WorkerThread(unsigned thread_number, MimoProcessor& parent)
368 : _thread_number(thread_number)
369 , _parent(parent)
370 , _thread(std::thread(&WorkerThread::_thread_function, this))
371 {
372 // Set thread priority from interface_policy, if available
373 thread_traits<interface_policy
374 , std::thread::native_handle_type>::update_priority(parent
375 , _thread.native_handle());
376 }
377
378 WorkerThread(WorkerThread&& other)
379 : _parent{other._parent}
380 {
381 // WorkerThread must be movable to be stored in a std::vector.
382 // We never actually move it, so this should never be called:
383 throw std::logic_error("This is just a work-around, don't move!");
384 }
385
386 ~WorkerThread()
387 {
388 _keep_running.store(false, std::memory_order_release);
389 this->cont_semaphore.post();
390 _thread.join();
391 }
392
393 Semaphore cont_semaphore{0};
394 Semaphore wait_semaphore{0};
395
396 private:
397 void _thread_function()
398 {
399 for (;;)
400 {
401 // wait for main audio thread
402 this->cont_semaphore.wait();
403
404 if (!_keep_running.load(std::memory_order_acquire)) { break; }
405
406 _parent._process_selected_items_in_current_list(_thread_number);
407
408 // report to main audio thread
409 this->wait_semaphore.post();
410 }
411 }
412
413 std::atomic<bool> _keep_running{true};
414 unsigned _thread_number;
415 MimoProcessor& _parent;
416
417 std::thread _thread; // Thread must be initialized after semaphores
418 };
419
420 class Xput;
421
422 // This is called from the interface_policy
423 virtual void process()
424 {
425 _fifo.process_commands();
426 _process_list(_input_list);
427 typename Derived::Process(this->derived());
428 _process_list(_output_list);
429 this->process_query_commands();
430 }
431
432 void _process_current_list_in_main_thread();
433 void _process_selected_items_in_current_list(unsigned thread_number);
434
435 Input* _add_helper(Input* in) { return _input_list.add(in); }
436 Output* _add_helper(Output* out) { return _output_list.add(out); }
437
438 // TODO: make "volatile"?
439 rtlist_t* _current_list;
440
442 const unsigned _num_threads;
443
444 fixed_vector<WorkerThread> _thread_data;
445
446 rtlist_t _input_list, _output_list;
447};
448
450APF_MIMOPROCESSOR_TEMPLATES
451APF_MIMOPROCESSOR_BASE::MimoProcessor(const parameter_map& params_)
452 : interface_policy(params_)
453 , query_policy()
454 , params(params_)
455 , _fifo(params.get("fifo_size", size_t(1024)))
456 , _current_list(nullptr)
457 , _num_threads(params.get("threads", std::thread::hardware_concurrency()))
458 , _input_list(_fifo)
459 , _output_list(_fifo)
460{
461 assert(_num_threads > 0);
462
463 // deactivate FIFO for non-realtime initializations
464 if (!_fifo.deactivate()) throw std::logic_error("Bug: FIFO not empty!");
465
466 // Create N-1 worker threads. NOTE: Number 0 is reserved for the main thread.
467 _thread_data.reserve(_num_threads - 1);
468 for (unsigned i = 1; i < _num_threads; ++i)
469 {
470 _thread_data.emplace_back(i, *this);
471 }
472}
473
474APF_MIMOPROCESSOR_TEMPLATES
475void
476APF_MIMOPROCESSOR_BASE::_process_list(rtlist_t& l)
477{
478 _current_list = &l;
479 _process_current_list_in_main_thread();
480}
481
482APF_MIMOPROCESSOR_TEMPLATES
483void
484APF_MIMOPROCESSOR_BASE::_process_list(rtlist_t& l1, rtlist_t& l2)
485{
486 // TODO: extend for more than two lists?
487
488 // Note: this was not conforming to C++03.
489 // According to C++03 iterators to the spliced elements are invalidated!
490 // In C++11 this was fixed.
491 // see http://stackoverflow.com/q/143156
492
493 // see also http://stackoverflow.com/q/7681376
494
495 auto temp = l2.begin();
496 l2.splice(temp, l1); // join lists: "L2 = L1 + L2"
497 _process_list(l2);
498 l1.splice(l1.end(), l2, l2.begin(), temp); // restore original lists
499
500 // not exception-safe (original lists are not restored), but who cares?
501}
502
503APF_MIMOPROCESSOR_TEMPLATES
504void
505APF_MIMOPROCESSOR_BASE::_process_selected_items_in_current_list(
506 unsigned thread_number)
507{
508 assert(_current_list);
509
510 unsigned n = 0;
511 for (auto& i: *_current_list)
512 {
513 if (thread_number == n++ % _num_threads)
514 {
515 assert(i);
516 i->process();
517 }
518 }
519}
520
521APF_MIMOPROCESSOR_TEMPLATES
522void
523APF_MIMOPROCESSOR_BASE::_process_current_list_in_main_thread()
524{
525 assert(_current_list);
526 if (_current_list->empty()) return;
527
528 // wake all threads
529 for (auto& it: _thread_data) it.cont_semaphore.post();
530
531 _process_selected_items_in_current_list(0);
532
533 // wait for worker threads
534 for (auto& it: _thread_data) it.wait_semaphore.wait();
535}
536
537APF_MIMOPROCESSOR_TEMPLATES
538class APF_MIMOPROCESSOR_BASE::Xput : public APF_MIMOPROCESSOR_BASE::Item
539{
540 public:
541 // Parameters for an Input or Output.
542 // You can add your own parameters by deriving from it.
543 struct Params : parameter_map
544 {
545 Params() : parent(nullptr) {}
546 Derived* parent;
547
548 Params& operator=(const parameter_map& p)
549 {
550 this->parameter_map::operator=(p);
551 return *this;
552 }
553 };
554
555 Derived& parent;
556
557 protected:
560 explicit Xput(const Params& p)
561 : parent(*(p.parent
562 ? p.parent
563 : throw std::logic_error("Bug: In/Output: parent == 0!")))
564 {}
565};
566
568APF_MIMOPROCESSOR_TEMPLATES
569class APF_MIMOPROCESSOR_BASE::Input : public APF_MIMOPROCESSOR_BASE::Xput
570 , public interface_policy::Input
571 , public CRTP<typename Derived::Input>
572{
573 public:
574 struct Params : APF_MIMOPROCESSOR_BASE::Xput::Params
575 {
576 using Xput::Params::operator=;
577 using outer = typename Derived::Input; // see add()
578 };
579
580 struct Process { Process(Input&) {} };
581
582 explicit Input(const Params& p)
583 : Xput(p)
584 , interface_policy::Input(*p.parent, p)
585 {}
586
587 private:
588 virtual void process()
589 {
590 this->fetch_buffer();
591 typename Derived::Input::Process(this->derived());
592 }
593};
594
596APF_MIMOPROCESSOR_TEMPLATES
597class APF_MIMOPROCESSOR_BASE::DefaultInput : public APF_MIMOPROCESSOR_BASE::Input
598{
599 public:
600 using Params = typename Input::Params;
601 using iterator = typename Input::iterator;
602
603 explicit DefaultInput(const Params& p) : Input(p) {}
604
605 iterator begin() const { return this->buffer.begin(); }
606 iterator end() const { return this->buffer.end(); }
607};
608
610APF_MIMOPROCESSOR_TEMPLATES
611class APF_MIMOPROCESSOR_BASE::Output : public APF_MIMOPROCESSOR_BASE::Xput
612 , public interface_policy::Output
613 , public CRTP<typename Derived::Output>
614{
615 public:
616 struct Params : APF_MIMOPROCESSOR_BASE::Xput::Params
617 {
618 using Xput::Params::operator=;
619 using outer = typename Derived::Output; // see add()
620 };
621
622 struct Process { Process(Output&) {} };
623
624 explicit Output(const Params& p)
625 : Xput(p)
626 , interface_policy::Output(*p.parent, p)
627 {}
628
629 private:
630 virtual void process()
631 {
632 this->fetch_buffer();
633 typename Derived::Output::Process(this->derived());
634 }
635};
636
638APF_MIMOPROCESSOR_TEMPLATES
639class APF_MIMOPROCESSOR_BASE::DefaultOutput : public APF_MIMOPROCESSOR_BASE::Output
640{
641 public:
642 using Params = typename Output::Params;
643 using iterator = typename Output::iterator;
644
645 DefaultOutput(const Params& p) : Output(p) {}
646
647 iterator begin() const { return this->buffer.begin(); }
648 iterator end() const { return this->buffer.end(); }
649};
650
651} // namespace apf
652
653#undef APF_MIMOPROCESSOR_TEMPLATES
654#undef APF_MIMOPROCESSOR_BASE
655
656#endif
Curiously Recurring Template Pattern (CRTP) base class.
Definition: misc.h:43
Manage command queue from non-realtime thread to realtime thread.
Definition: commandqueue.h:49
void cleanup_commands()
Clean up all commands in the cleanup-queue.
Definition: commandqueue.h:110
void process_commands()
Execute all commands in the queue.
Definition: commandqueue.h:145
bool deactivate()
Deactivate queue; process following commands in the non-realtime thread.
Definition: commandqueue.h:121
void wait()
Wait for realtime thread.
Definition: commandqueue.h:219
void reactivate()
Re-activate queue.
Definition: commandqueue.h:129
bool commands_available() const
Check if commands are available.
Definition: commandqueue.h:161
Input class with begin() and end().
Output class with begin() and end().
Lock is released when it goes out of scope.
Multi-threaded multiple-input-multiple-output (MIMO) processor.
~MimoProcessor()
Protected non-virtual destructor.
MimoProcessor(const parameter_map &params=parameter_map())
bool activate(F &query_function, int usleeptime)
This is only available when enable_queries is used.
Classes derived from this class cannot be copied (but still moved).
Definition: misc.h:60
void reserve(size_type n)
Reserve space for new elements.
Definition: container.h:178
void emplace_back(Args &&... args)
Construct element at the end.
Definition: container.h:199
Some containers.
Several more or less useful iterators and some macros.
Miscellaneous helper classes.
Audio Processing Framework.
Definition: iterator.h:61
A "dictionary" for parameters.
A (under certain circumstances) realtime-safe list.
Abstract base class for list items.
virtual void process()=0
to be overwritten in the derived class
Base class for items which have a Process class.
virtual void process()
to be overwritten in the derived class
Proxy class for accessing an RtList.
Encapsulate a container of base pointers (const version).
Definition: iterator.h:644
A "dictionary" for parameters.
Definition: parameter_map.h:68
Utility classes for the use with std::thread.