Audio Processing Framework (APF) version 0.5.0
commandqueue.h
Go to the documentation of this file.
1/******************************************************************************
2 Copyright (c) 2012-2016 Institut für Nachrichtentechnik, Universität Rostock
3 Copyright (c) 2006-2012 Quality & Usability Lab
4 Deutsche Telekom Laboratories, TU Berlin
5
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"), to deal
8 in the Software without restriction, including without limitation the rights
9 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 copies of the Software, and to permit persons to whom the Software is
11 furnished to do so, subject to the following conditions:
12
13 The above copyright notice and this permission notice shall be included in
14 all copies or substantial portions of the Software.
15
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 THE SOFTWARE.
23*******************************************************************************/
24
25// https://AudioProcessingFramework.github.io/
26
29
30#ifndef APF_COMMANDQUEUE_H
31#define APF_COMMANDQUEUE_H
32
33#include <thread> // std::this_thread::sleep_for
34#include <chrono> // std::chrono::microseconds
35#include <cassert> // for assert()
36
37#include "apf/lockfreefifo.h"
38
39namespace apf
40{
41
49{
50 public:
55 {
57 virtual ~Command() {}
58
61 virtual void execute() = 0;
62
65 virtual void cleanup() = 0;
66 };
67
69 class WaitCommand : public Command
70 {
71 public:
73 WaitCommand(bool& done) : _done(done) {}
74
75 private:
76 virtual void execute() { }
77 virtual void cleanup() { _done = true; }
78
79 bool& _done;
80 };
81
84
85
88 explicit CommandQueue(size_t size)
89 : _in_fifo(size)
90 , _out_fifo(size)
91 , _active(true)
92 {}
93
98 {
99 this->cleanup_commands();
100 // TODO: warning if process queue is not empty?
101 // TODO: if inactive -> process commands (if active -> ???)
102 }
103
104 inline void push(Command* cmd);
105
106 inline void wait();
107
111 {
112 Command* cmd;
113 while ((cmd = _out_fifo.pop()) != nullptr) { _cleanup(cmd); }
114 }
115
116 // TODO: avoid return value?
121 inline bool deactivate()
122 {
123 this->cleanup_commands();
124 if (_in_fifo.empty()) _active = false;
125 return !_active;
126 }
127
129 inline void reactivate()
130 {
131 this->cleanup_commands();
132 assert(_in_fifo.empty());
133 _active = true;
134 }
135
137
139
140
146 {
147 Command* cmd;
148 while ((cmd = _in_fifo.pop()) != nullptr)
149 {
150 cmd->execute();
151 bool result = _out_fifo.push(cmd);
152 // If _out_fifo is full, cmd is not cleaned up!
153 // This is very unlikely to happen (if not impossible).
154 assert(result && "Error in _out_fifo.push()!");
155 (void)result; // avoid "unused-but-set-variable" warning
156 }
157 }
158
162 {
163 return !_in_fifo.empty();
164 }
165
167
168 private:
170 void _cleanup(Command* cmd)
171 {
172 assert(cmd != nullptr);
173 cmd->cleanup();
174 delete cmd;
175 }
176
178 LockFreeFifo<Command*> _in_fifo;
180 LockFreeFifo<Command*> _out_fifo;
181
182 bool _active;
183};
184
193{
194 if (!_active)
195 {
196 cmd->execute();
197 _cleanup(cmd);
198 return;
199 }
200
201 // First remove all commands from _out_fifo.
202 // This ensures that it's not going to be full which would block
203 // process_commands() and its calling realtime thread.
204 this->cleanup_commands();
205
206 // Now push the command on _in_fifo; if the FIFO is full: retry, retry, ...
207 while (!_in_fifo.push(cmd))
208 {
209 // We don't really know if that ever happens, so we abort in debug-mode:
210 assert(false && "Error in _in_fifo.push()!");
211 // TODO: avoid this sleep?
212 std::this_thread::sleep_for(std::chrono::microseconds(50));
213 }
214}
215
220{
221 bool done = false;
222 this->push(new WaitCommand(done));
223
224 this->cleanup_commands();
225 while (!done)
226 {
227 // TODO: avoid this sleep?
228 std::this_thread::sleep_for(std::chrono::microseconds(50));
229 this->cleanup_commands();
230 }
231}
232
233} // namespace apf
234
235#endif
Dummy command to synchronize with non-realtime thread.
Definition: commandqueue.h:70
WaitCommand(bool &done)
Constructor.
Definition: commandqueue.h:73
Manage command queue from non-realtime thread to realtime thread.
Definition: commandqueue.h:49
~CommandQueue()
Destructor.
Definition: commandqueue.h:97
void cleanup_commands()
Clean up all commands in the cleanup-queue.
Definition: commandqueue.h:110
void process_commands()
Execute all commands in the queue.
Definition: commandqueue.h:145
bool deactivate()
Deactivate queue; process following commands in the non-realtime thread.
Definition: commandqueue.h:121
void push(Command *cmd)
Push a command to be executed in the realtime thread.
Definition: commandqueue.h:192
CommandQueue(size_t size)
Constructor.
Definition: commandqueue.h:88
void wait()
Wait for realtime thread.
Definition: commandqueue.h:219
void reactivate()
Re-activate queue.
Definition: commandqueue.h:129
bool commands_available() const
Check if commands are available.
Definition: commandqueue.h:161
Classes derived from this class cannot be copied (but still moved).
Definition: misc.h:60
Lock-free first-in-first-out queue.
Audio Processing Framework.
Definition: iterator.h:61
Abstract base class for realtime commands.
Definition: commandqueue.h:55
virtual void cleanup()=0
Cleanup of resources.
virtual void execute()=0
The actual implementation of the command.
virtual ~Command()
Empty virtual destructor.
Definition: commandqueue.h:57