AlexaClientSDK  3.0.0
A cross-platform, modular SDK for interacting with the Alexa Voice Service
Writer.h
Go to the documentation of this file.
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0/
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_WRITER_H_
17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_WRITER_H_
18 
19 #include <cstddef>
20 #include <cstdint>
21 #include <cstring>
22 #include <limits>
23 #include <mutex>
24 #include <vector>
25 
27 
28 #include "SharedDataStream.h"
29 #include "WriterPolicy.h"
30 
31 namespace alexaClientSDK {
32 namespace avsCommon {
33 namespace utils {
34 namespace sds {
35 
45 template <typename T>
46 class SharedDataStream<T>::Writer {
47 public:
50 
56  struct Error {
57  enum {
59  CLOSED = 0,
61  WOULDBLOCK = -1,
63  INVALID = -2,
65  TIMEDOUT = -3,
66  };
67  };
68 
75  Writer(Policy policy, std::shared_ptr<BufferLayout> bufferLayout);
76 
78  ~Writer();
79 
97  ssize_t write(const void* buf, size_t nWords, std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
98 
104  Index tell() const;
105 
110  void close();
111 
118  size_t getWordSize() const;
119 
125  static std::string errorToString(Error error);
126 
127 private:
131  static const std::string TAG;
132 
134  Policy m_policy;
135 
137  std::shared_ptr<BufferLayout> m_bufferLayout;
138 
144  bool m_closed;
145 };
146 
147 template <typename T>
148 const std::string SharedDataStream<T>::Writer::TAG = "SdsWriter";
149 
150 template <typename T>
151 SharedDataStream<T>::Writer::Writer(Policy policy, std::shared_ptr<BufferLayout> bufferLayout) :
152  m_policy{policy},
153  m_bufferLayout{bufferLayout},
154  m_closed{false} {
155  // Note - SharedDataStream::createWriter() holds writerEnableMutex while calling this function.
156  auto header = m_bufferLayout->getHeader();
157  header->isWriterEnabled = true;
158  header->writeEndCursor = header->writeStartCursor.load();
159 }
160 
161 template <typename T>
163  close();
164 }
165 
166 template <typename T>
167 ssize_t SharedDataStream<T>::Writer::write(const void* buf, size_t nWords, std::chrono::milliseconds timeout) {
168  if (nullptr == buf) {
169  logger::acsdkError(logger::LogEntry(TAG, "writeFailed").d("reason", "nullBuffer"));
170  return Error::INVALID;
171  }
172  if (0 == nWords) {
173  logger::acsdkError(logger::LogEntry(TAG, "writeFailed").d("reason", "zeroNumWords"));
174  return Error::INVALID;
175  }
176 
177  auto header = m_bufferLayout->getHeader();
178  if (!header->isWriterEnabled) {
179  logger::acsdkError(logger::LogEntry(TAG, "writeFailed").d("reason", "writerDisabled"));
180  return Error::CLOSED;
181  }
182 
183  auto wordsToCopy = nWords;
184  auto buf8 = static_cast<const uint8_t*>(buf);
185  std::unique_lock<Mutex> backwardSeekLock(header->backwardSeekMutex, std::defer_lock);
186  Index writeEnd = header->writeStartCursor + nWords;
187 
188  switch (m_policy) {
190  // For NONBLOCKABLE, we can truncate the write if it won't fit in the buffer.
191  if (nWords > m_bufferLayout->getDataSize()) {
192  wordsToCopy = nWords = m_bufferLayout->getDataSize();
193  writeEnd = header->writeStartCursor + nWords;
194  }
195  break;
197  // For ALL_OR_NOTHING, we can't overwrite readers, and we can't truncate, but we might be able to discard
198  // bytes that overflow if oldestUnconsumedCursor is in the future (e.g. there are readers waiting for future
199  // data which has not been written yet).
200 
201  // Note - this check must be performed while locked to prevent a reader from backwards-seeking into the
202  // write region between here and the writeEndCursor update below.
203  backwardSeekLock.lock();
204  if ((writeEnd >= header->oldestUnconsumedCursor) &&
205  ((writeEnd - header->oldestUnconsumedCursor) > m_bufferLayout->getDataSize())) {
206  return Error::WOULDBLOCK;
207  }
208  break;
209  case Policy::BLOCKING:
210  // For BLOCKING, we need to wait until there is room for at least one word.
211 
212  // Condition for returning from write: there is space for a write.
213  auto predicate = [this, header] {
214  return (header->writeStartCursor < header->oldestUnconsumedCursor) ||
215  (header->writeStartCursor - header->oldestUnconsumedCursor) < m_bufferLayout->getDataSize();
216  };
217 
218  // Note - this check must be performed while locked to prevent a reader from backwards-seeking into the
219  // write region between here and the writeEndCursor update below.
220  backwardSeekLock.lock();
221 
222  // Wait for space to become available.
223  if (std::chrono::milliseconds::zero() == timeout) {
224  header->spaceAvailableConditionVariable.wait(backwardSeekLock, predicate);
225  } else if (!header->spaceAvailableConditionVariable.wait_for(backwardSeekLock, timeout, predicate)) {
226  return Error::TIMEDOUT;
227  }
228 
229  // Figure out how much space we have.
230  auto spaceAvailable = m_bufferLayout->getDataSize();
231  if (header->writeStartCursor >= header->oldestUnconsumedCursor) {
232  auto wordsToOverrun =
233  m_bufferLayout->getDataSize() - (header->writeStartCursor - header->oldestUnconsumedCursor);
234  if (wordsToOverrun < spaceAvailable) {
235  spaceAvailable = wordsToOverrun;
236  }
237  }
238 
239  // For BLOCKING, we can truncate the write if it won't fit in the buffer.
240  if (spaceAvailable < nWords) {
241  wordsToCopy = nWords = spaceAvailable;
242  writeEnd = header->writeStartCursor + nWords;
243  }
244 
245  break;
246  }
247 
248  header->writeEndCursor = writeEnd;
249 
250  // We've updated our end cursor, so we no longer need to hold off backward seeks.
251  if (backwardSeekLock) {
252  backwardSeekLock.unlock();
253  }
254 
255  if (Policy::ALL_OR_NOTHING == m_policy) {
256  // If we have more data than the SDS can hold and we're not going to be overwriting oldestUnconsumedCursor, we
257  // can safely discard the initial data and just leave the trailing data in the buffer.
258  if (wordsToCopy > m_bufferLayout->getDataSize()) {
259  wordsToCopy = m_bufferLayout->getDataSize();
260  buf8 += (nWords - wordsToCopy) * getWordSize();
261  }
262  }
263 
264  // Split it across the wrap.
265  size_t beforeWrap = m_bufferLayout->wordsUntilWrap(header->writeStartCursor);
266  if (beforeWrap > wordsToCopy) {
267  beforeWrap = wordsToCopy;
268  }
269  size_t afterWrap = wordsToCopy - beforeWrap;
270 
271  // Copy the two segments.
272  memcpy(m_bufferLayout->getData(header->writeStartCursor), buf8, beforeWrap * getWordSize());
273  if (afterWrap > 0) {
274  memcpy(
275  m_bufferLayout->getData(header->writeStartCursor + beforeWrap),
276  buf8 + beforeWrap * getWordSize(),
277  afterWrap * getWordSize());
278  }
279 
280  // Advance the write cursor.
281  // Note: To prevent a race condition and ensure that readers which block on dataAvailableConditionVariable don't
282  // miss a notify, we should always lock the dataAvailableConditionVariable mutex while moving writeStartCursor. As
283  // an optimization, we skip that lock for NONBLOCKABLE writers under the assumption that they will be writing
284  // continuously, so a missed notification is not significant.
285  // Note: As a further optimization, the lock could be omitted if no blocking readers are in use (ACSDK-251).
286  std::unique_lock<Mutex> dataAvailableLock(header->dataAvailableMutex, std::defer_lock);
287  if (Policy::NONBLOCKABLE != m_policy) {
288  dataAvailableLock.lock();
289  }
290  header->writeStartCursor = header->writeEndCursor.load();
291  if (Policy::NONBLOCKABLE != m_policy) {
292  dataAvailableLock.unlock();
293  }
294 
295  // Notify the reader(s).
296  // Note: as an optimization, we could skip this if there are no blocking readers (ACSDK-251).
297  header->dataAvailableConditionVariable.notify_all();
298 
299  return nWords;
300 }
301 
302 template <typename T>
304  return m_bufferLayout->getHeader()->writeStartCursor;
305 }
306 
307 template <typename T>
309  auto header = m_bufferLayout->getHeader();
310  std::lock_guard<Mutex> lock(header->writerEnableMutex);
311  if (m_closed) {
312  return;
313  }
314  if (header->isWriterEnabled) {
315  header->isWriterEnabled = false;
316 
317  std::unique_lock<Mutex> dataAvailableLock(header->dataAvailableMutex);
318 
319  header->hasWriterBeenClosed = true;
320 
321  header->dataAvailableConditionVariable.notify_all();
322  }
323  m_closed = true;
324 }
325 
326 template <typename T>
328  return m_bufferLayout->getHeader()->wordSize;
329 }
330 
331 template <typename T>
333  switch (error) {
334  case Error::CLOSED:
335  return "CLOSED";
336  case Error::WOULDBLOCK:
337  return "WOULDBLOCK";
338  case Error::INVALID:
339  return "INVALID";
340  case Error::TIMEDOUT:
341  return "TIMEDOUT";
342  }
343  logger::acsdkError(logger::LogEntry(TAG, "errorToStringFailed").d("reason", "invalidError").d("error", error));
344  return "(unknown error " + to_string(error) + ")";
345 }
346 
347 } // namespace sds
348 } // namespace utils
349 } // namespace avsCommon
350 } // namespace alexaClientSDK
351 
352 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_WRITER_H_
ssize_t write(const void *buf, size_t nWords, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))
Definition: Writer.h:167
static std::string errorToString(Error error)
Definition: Writer.h:332
WriterPolicy
Specifies the policy to use for writing to the stream.
Definition: WriterPolicy.h:25
::std::string string
Definition: gtest-port.h:1097
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
Returned when a write() parameter is invalid.
Definition: Writer.h:63
~Writer()
This destructor detaches the Writer from a BufferLayout.
Definition: Writer.h:162
Returned when policy is Policy::BLOCKING and no space becomes available before the specified timeout...
Definition: Writer.h:65
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
void acsdkError(const LogEntry &entry)
Writer(Policy policy, std::shared_ptr< BufferLayout > bufferLayout)
Definition: Writer.h:151
Returned when close() has been previously called on the Writer.
Definition: Writer.h:59
Returned when policy is Policy::ALL_OR_NOTHING and the write() would overwrrite unconsumed data...
Definition: Writer.h:61
LogEntry is used to compile the log entry text to log via Logger.
Definition: LogEntry.h:33

AlexaClientSDK 3.0.0 - Copyright 2016-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0