30#ifndef APF_MIMOPROCESSOR_H
31#define APF_MIMOPROCESSOR_H
44#define APF_MIMOPROCESSOR_TEMPLATES template<typename Derived, typename interface_policy, typename query_policy>
45#define APF_MIMOPROCESSOR_BASE MimoProcessor<Derived, interface_policy, query_policy>
68#define APF_PROCESS(name, parent) \
69struct Process : parent::Process { \
70 explicit Process(name& ctor_arg) : parent::Process(ctor_arg) { \
71 ctor_arg.APF_PROCESS_internal(); } }; \
72void APF_PROCESS_internal()
78template<
typename interface_policy,
typename native_handle_type>
81 static void update_priority(
const interface_policy&, native_handle_type)
noexcept {}
93 void start_query_thread(F& query_function,
int usleeptime)
95 _query_thread = std::make_unique<QueryThread>(_query_fifo, usleeptime);
99 _new_query(query_function);
103 void process_query_commands()
105 _query_fifo.process_commands();
108 void stop_query_thread()
110 _keep_running =
false;
112 _query_thread.reset();
114 _query_fifo.deactivate();
118 class CleanupFunction
121 CleanupFunction(CommandQueue& fifo) : _fifo(fifo) {};
122 void operator()() { _fifo.cleanup_commands(); }
128 struct QueryThread : ScopedThread<CleanupFunction>
130 QueryThread(CommandQueue& fifo,
int usleeptime)
131 : ScopedThread<CleanupFunction>(CleanupFunction(fifo), usleeptime)
136 class QueryCommand :
public CommandQueue::Command
139 QueryCommand(F& query_function, enable_queries& parent)
140 : _query_function(query_function)
145 virtual void execute()
147 _query_function.query();
151 virtual void cleanup()
153 if (_parent._keep_running)
155 _query_function.update();
156 _parent._new_query(_query_function);
162 enable_queries& _parent;
166 void _new_query(F& query_function)
168 _query_fifo.push(
new QueryCommand<F>(query_function, *
this));
171 std::atomic<bool> _keep_running{
true};
172 std::unique_ptr<QueryThread> _query_thread;
173 CommandQueue _query_fifo;
179 void process_query_commands() {}
180 void stop_query_thread() {}
195template<
typename Derived
196 ,
typename interface_policy
197 ,
typename query_policy = disable_queries>
199 ,
public query_policy
200 ,
public CRTP<Derived>
204 using typename interface_policy::sample_type;
214 virtual ~Item() =
default;
238 struct Process {
explicit Process(
ProcessItem&) {} };
242 typename X::Process(this->derived());
246 using rtlist_t = RtList<Item*>;
276 return interface_policy::activate();
283 auto result = this->activate();
284 this->start_query_thread(query_function, usleeptime);
290 this->stop_query_thread();
292 if (!interface_policy::deactivate())
return false;
305 if (!_fifo.
deactivate())
throw std::logic_error(
"Bug: FIFO not empty!");
311 void wait_for_rt_thread() { _fifo.
wait(); }
316 return this->add(
typename X::Params());
321 typename P::outer* add(
const P& p)
323 using X =
typename P::outer;
325 temp.parent = &this->derived();
326 return static_cast<X*
>(_add_helper(
new X(temp)));
329 void rem(Input* in) { _input_list.rem(in); }
330 void rem(Output* out) { _output_list.rem(out); }
332 const rtlist_t& get_input_list()
const {
return _input_list; }
333 const rtlist_t& get_output_list()
const {
return _output_list; }
335 unsigned threads()
const {
return _num_threads; }
338 const parameter_map params;
341 using MimoProcessorBase = APF_MIMOPROCESSOR_BASE;
343 using rtlist_iterator =
typename rtlist_t::iterator;
344 using rtlist_const_iterator =
typename rtlist_t::const_iterator;
346 struct Process { Process(Derived&) {} };
348 explicit MimoProcessor(
const parameter_map& params = parameter_map());
355 _output_list.clear();
358 void _process_list(rtlist_t& l);
359 void _process_list(rtlist_t& l1, rtlist_t& l2);
368 : _thread_number(thread_number)
370 , _thread(std::thread(&WorkerThread::_thread_function, this))
373 thread_traits<interface_policy
374 , std::thread::native_handle_type>::update_priority(parent
375 , _thread.native_handle());
378 WorkerThread(WorkerThread&& other)
379 : _parent{other._parent}
383 throw std::logic_error(
"This is just a work-around, don't move!");
388 _keep_running.store(
false, std::memory_order_release);
389 this->cont_semaphore.post();
393 Semaphore cont_semaphore{0};
394 Semaphore wait_semaphore{0};
397 void _thread_function()
402 this->cont_semaphore.wait();
404 if (!_keep_running.load(std::memory_order_acquire)) {
break; }
406 _parent._process_selected_items_in_current_list(_thread_number);
409 this->wait_semaphore.post();
413 std::atomic<bool> _keep_running{
true};
414 unsigned _thread_number;
423 virtual void process()
426 _process_list(_input_list);
427 typename Derived::Process(this->derived());
428 _process_list(_output_list);
429 this->process_query_commands();
432 void _process_current_list_in_main_thread();
433 void _process_selected_items_in_current_list(
unsigned thread_number);
435 Input* _add_helper(Input* in) {
return _input_list.add(in); }
436 Output* _add_helper(Output* out) {
return _output_list.add(out); }
439 rtlist_t* _current_list;
442 const unsigned _num_threads;
444 fixed_vector<WorkerThread> _thread_data;
446 rtlist_t _input_list, _output_list;
450APF_MIMOPROCESSOR_TEMPLATES
452 : interface_policy(params_)
455 , _fifo(params.get(
"fifo_size", size_t(1024)))
456 , _current_list(nullptr)
457 , _num_threads(params.get(
"threads", std::thread::hardware_concurrency()))
459 , _output_list(_fifo)
461 assert(_num_threads > 0);
464 if (!_fifo.
deactivate())
throw std::logic_error(
"Bug: FIFO not empty!");
467 _thread_data.
reserve(_num_threads - 1);
468 for (
unsigned i = 1; i < _num_threads; ++i)
474APF_MIMOPROCESSOR_TEMPLATES
476APF_MIMOPROCESSOR_BASE::_process_list(rtlist_t& l)
479 _process_current_list_in_main_thread();
482APF_MIMOPROCESSOR_TEMPLATES
484APF_MIMOPROCESSOR_BASE::_process_list(rtlist_t& l1, rtlist_t& l2)
495 auto temp = l2.begin();
498 l1.splice(l1.end(), l2, l2.begin(), temp);
503APF_MIMOPROCESSOR_TEMPLATES
505APF_MIMOPROCESSOR_BASE::_process_selected_items_in_current_list(
506 unsigned thread_number)
508 assert(_current_list);
511 for (
auto& i: *_current_list)
513 if (thread_number == n++ % _num_threads)
521APF_MIMOPROCESSOR_TEMPLATES
523APF_MIMOPROCESSOR_BASE::_process_current_list_in_main_thread()
525 assert(_current_list);
526 if (_current_list->empty())
return;
529 for (
auto& it: _thread_data) it.cont_semaphore.post();
531 _process_selected_items_in_current_list(0);
534 for (
auto& it: _thread_data) it.wait_semaphore.wait();
537APF_MIMOPROCESSOR_TEMPLATES
538class APF_MIMOPROCESSOR_BASE::Xput :
public APF_MIMOPROCESSOR_BASE::Item
543 struct Params : parameter_map
545 Params() : parent(nullptr) {}
548 Params& operator=(
const parameter_map& p)
550 this->parameter_map::operator=(p);
560 explicit Xput(
const Params& p)
563 : throw std::logic_error(
"Bug: In/Output: parent == 0!")))
568APF_MIMOPROCESSOR_TEMPLATES
569class APF_MIMOPROCESSOR_BASE::Input :
public APF_MIMOPROCESSOR_BASE::Xput
570 ,
public interface_policy::Input
571 ,
public CRTP<typename Derived::Input>
574 struct Params : APF_MIMOPROCESSOR_BASE::Xput::Params
576 using Xput::Params::operator=;
577 using outer =
typename Derived::Input;
580 struct Process { Process(
Input&) {} };
582 explicit Input(
const Params& p)
584 , interface_policy::Input(*p.parent, p)
588 virtual void process()
590 this->fetch_buffer();
591 typename Derived::Input::Process(this->derived());
596APF_MIMOPROCESSOR_TEMPLATES
597class APF_MIMOPROCESSOR_BASE::DefaultInput :
public APF_MIMOPROCESSOR_BASE::Input
600 using Params =
typename Input::Params;
601 using iterator =
typename Input::iterator;
605 iterator begin()
const {
return this->buffer.begin(); }
606 iterator end()
const {
return this->buffer.end(); }
610APF_MIMOPROCESSOR_TEMPLATES
611class APF_MIMOPROCESSOR_BASE::Output :
public APF_MIMOPROCESSOR_BASE::Xput
612 ,
public interface_policy::Output
613 ,
public CRTP<typename Derived::Output>
616 struct Params : APF_MIMOPROCESSOR_BASE::Xput::Params
618 using Xput::Params::operator=;
619 using outer =
typename Derived::Output;
622 struct Process { Process(
Output&) {} };
624 explicit Output(
const Params& p)
626 , interface_policy::Output(*p.parent, p)
630 virtual void process()
632 this->fetch_buffer();
633 typename Derived::Output::Process(this->derived());
638APF_MIMOPROCESSOR_TEMPLATES
639class APF_MIMOPROCESSOR_BASE::DefaultOutput :
public APF_MIMOPROCESSOR_BASE::Output
642 using Params =
typename Output::Params;
643 using iterator =
typename Output::iterator;
647 iterator begin()
const {
return this->buffer.begin(); }
648 iterator end()
const {
return this->buffer.end(); }
653#undef APF_MIMOPROCESSOR_TEMPLATES
654#undef APF_MIMOPROCESSOR_BASE
Curiously Recurring Template Pattern (CRTP) base class.
Manage command queue from non-realtime thread to realtime thread.
void cleanup_commands()
Clean up all commands in the cleanup-queue.
void process_commands()
Execute all commands in the queue.
bool deactivate()
Deactivate queue; process following commands in the non-realtime thread.
void wait()
Wait for realtime thread.
void reactivate()
Re-activate queue.
bool commands_available() const
Check if commands are available.
Output class with begin() and end().
Lock is released when it goes out of scope.
Multi-threaded multiple-input-multiple-output (MIMO) processor.
~MimoProcessor()
Protected non-virtual destructor.
MimoProcessor(const parameter_map ¶ms=parameter_map())
bool activate(F &query_function, int usleeptime)
This is only available when enable_queries is used.
Classes derived from this class cannot be copied (but still moved).
void reserve(size_type n)
Reserve space for new elements.
void emplace_back(Args &&... args)
Construct element at the end.
Several more or less useful iterators and some macros.
Miscellaneous helper classes.
Audio Processing Framework.
A "dictionary" for parameters.
A (under certain circumstances) realtime-safe list.
Abstract base class for list items.
virtual void process()=0
to be overwritten in the derived class
Base class for items which have a Process class.
virtual void process()
to be overwritten in the derived class
Proxy class for accessing an RtList.
Encapsulate a container of base pointers (const version).
A "dictionary" for parameters.