AlexaClientSDK  1.25.0
A cross-platform, modular SDK for interacting with the Alexa Voice Service
Executor.h
Go to the documentation of this file.
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0/
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_THREADING_EXECUTOR_H_
17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_THREADING_EXECUTOR_H_
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <chrono>
22 #include <deque>
23 #include <functional>
24 #include <future>
25 #include <memory>
26 #include <mutex>
27 #include <utility>
28 
31 
32 namespace alexaClientSDK {
33 namespace avsCommon {
34 namespace utils {
35 namespace threading {
36 
40 class Executor {
41 public:
48  Executor(const std::chrono::milliseconds& delayExit = std::chrono::milliseconds(1000));
49 
53  ~Executor();
54 
63  template <typename Task, typename... Args>
64  auto submit(Task task, Args&&... args) -> std::future<decltype(task(args...))>;
65 
75  template <typename Task, typename... Args>
76  auto submitToFront(Task task, Args&&... args) -> std::future<decltype(task(args...))>;
77 
81  void waitForSubmittedTasks();
82 
84  void shutdown();
85 
87  bool isShutdown();
88 
89 private:
91  using Queue = std::deque<std::function<void()>>;
92 
98  bool runNext();
99 
105  bool hasNext();
106 
113  std::function<void()> pop();
114 
125  template <typename Task, typename... Args>
126  auto pushTo(bool front, Task task, Args&&... args) -> std::future<decltype(task(args...))>;
127 
129  Queue m_queue;
130 
132  bool m_threadRunning;
133 
135  std::chrono::milliseconds m_timeout;
136 
138  std::mutex m_queueMutex;
139 
141  std::atomic_bool m_shutdown;
142 
144  std::shared_ptr<power::PowerResource> m_powerResource;
145 
147  std::condition_variable m_delayedCondition;
148 
150  const uint64_t m_id;
151 
153  TaskThread m_taskThread;
154 };
155 
156 template <typename Task, typename... Args>
157 auto Executor::submit(Task task, Args&&... args) -> std::future<decltype(task(args...))> {
158  bool front = false;
159  return pushTo(front, std::forward<Task>(task), std::forward<Args>(args)...);
160 }
161 
162 template <typename Task, typename... Args>
163 auto Executor::submitToFront(Task task, Args&&... args) -> std::future<decltype(task(args...))> {
164  bool front = true;
165  return pushTo(front, std::forward<Task>(task), std::forward<Args>(args)...);
166 }
167 
174 template <typename T>
175 inline static void forwardPromise(std::shared_ptr<std::promise<T>> promise, std::future<T>* future) {
176 #if __cpp_exceptions || defined(__EXCEPTIONS)
177  try {
178 #endif
179  promise->set_value(future->get());
180 #if __cpp_exceptions || defined(__EXCEPTIONS)
181  } catch (...) {
182  promise->set_exception(std::current_exception());
183  }
184 #endif
185 }
186 
193 template <>
194 inline void forwardPromise<void>(std::shared_ptr<std::promise<void>> promise, std::future<void>* future) {
195 #if __cpp_exceptions || defined(__EXCEPTIONS)
196  try {
197 #endif
198  future->get();
199  promise->set_value();
200 #if __cpp_exceptions || defined(__EXCEPTIONS)
201  } catch (...) {
202  promise->set_exception(std::current_exception());
203  }
204 #endif
205 }
206 
207 template <typename Task, typename... Args>
208 auto Executor::pushTo(bool front, Task task, Args&&... args) -> std::future<decltype(task(args...))> {
209  // Remove arguments from the tasks type by binding the arguments to the task.
210  auto boundTask = std::bind(std::forward<Task>(task), std::forward<Args>(args)...);
211 
212  /*
213  * Create a std::packaged_task with the correct return type. The decltype only returns the return value of the
214  * boundTask. The following parentheses make it a function call with the boundTask return type. The package task
215  * will then return a future of the correct type.
216  *
217  * Note: A std::packaged_task fulfills its future *during* the call to operator(). If the user of a
218  * std::packaged_task hands it off to another thread to execute, and then waits on the future, they will be able to
219  * retrieve the return value from the task and know that the task has executed, but they do not know exactly when
220  * the task object has been deleted. This distinction can be significant if the packaged task is holding onto
221  * resources that need to be freed (through a std::shared_ptr for example). If the user needs to wait for those
222  * resources to be freed they have no way of knowing how long to wait. The translated_task lambda below is a
223  * workaround for this limitation. It executes the packaged task, then disposes of it before passing the task's
224  * return value back to the future that the user is waiting on.
225  */
226  using PackagedTaskType = std::packaged_task<decltype(boundTask())()>;
227  auto packaged_task = std::make_shared<PackagedTaskType>(boundTask);
228 
229  // Create a promise/future that we will fulfill when we have cleaned up the task.
230  auto cleanupPromise = std::make_shared<std::promise<decltype(task(args...))>>();
231  auto cleanupFuture = cleanupPromise->get_future();
232 
233  // Remove the return type from the task by wrapping it in a lambda with no return value.
234  auto translated_task = [packaged_task, cleanupPromise]() mutable {
235  // Execute the task.
236  packaged_task->operator()();
237  // Note the future for the task's result.
238  auto taskFuture = packaged_task->get_future();
239  // Clean up the task.
240  packaged_task.reset();
241  // Forward the task's result to our cleanup promise/future.
242  forwardPromise(cleanupPromise, &taskFuture);
243  };
244 
245  // Release our local reference to packaged task so that the only remaining reference is inside the lambda.
246  packaged_task.reset();
247 
248  {
249  bool restart = false;
250  std::lock_guard<std::mutex> queueLock{m_queueMutex};
251  if (!m_shutdown) {
252  restart = !m_threadRunning;
253  if (m_powerResource) {
254  m_powerResource->acquire();
255  }
256  m_queue.emplace(front ? m_queue.begin() : m_queue.end(), std::move(translated_task));
257  } else {
258  using FutureType = decltype(task(args...));
259  return std::future<FutureType>();
260  }
261 
262  if (restart) {
263  // Restart task thread.
264  m_taskThread.start(std::bind(&Executor::runNext, this));
265  m_threadRunning = true;
266  }
267  }
268 
269  m_delayedCondition.notify_one();
270  return cleanupFuture;
271 }
272 
273 } // namespace threading
274 } // namespace utils
275 } // namespace avsCommon
276 } // namespace alexaClientSDK
277 
278 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_THREADING_EXECUTOR_H_
Executor(const std::chrono::milliseconds &delayExit=std::chrono::milliseconds(1000))
Definition: Executor.cpp:32
auto submitToFront(Task task, Args &&... args) -> std::future< decltype(task(args...))>
Definition: Executor.h:163
def args
Definition: android_test.py:111
bool start(std::function< bool()> jobRunner)
Definition: TaskThread.cpp:65
void forwardPromise< void >(std::shared_ptr< std::promise< void >> promise, std::future< void > *future)
Definition: Executor.h:194
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
bool isShutdown()
Returns whether or not the executor is shutdown.
Definition: Executor.cpp:97
static void forwardPromise(std::shared_ptr< std::promise< T >> promise, std::future< T > *future)
Definition: Executor.h:175
auto submit(Task task, Args &&... args) -> std::future< decltype(task(args...))>
Definition: Executor.h:157
void waitForSubmittedTasks()
Definition: Executor.cpp:40
void shutdown()
Clears the executor of outstanding tasks and refuses any additional tasks to be submitted.
Definition: Executor.cpp:89

AlexaClientSDK 1.25.0 - Copyright 2016-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0