AlexaClientSDK  3.0.0
A cross-platform, modular SDK for interacting with the Alexa Voice Service
Reader.h
Go to the documentation of this file.
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0/
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_READER_H_
17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_READER_H_
18 
19 #include <cstdint>
20 #include <cstddef>
21 #include <vector>
22 #include <mutex>
23 #include <limits>
24 #include <cstring>
25 
28 #include "SharedDataStream.h"
29 #include "ReaderPolicy.h"
30 
31 namespace alexaClientSDK {
32 namespace avsCommon {
33 namespace utils {
34 namespace sds {
35 
44 template <typename T>
45 class SharedDataStream<T>::Reader {
46 public:
49 
51  enum class Reference {
53  AFTER_READER,
55  BEFORE_READER,
57  BEFORE_WRITER,
59  ABSOLUTE
60  };
61 
67  struct Error {
68  enum {
73  CLOSED = 0,
75  OVERRUN = -1,
77  WOULDBLOCK = -2,
79  TIMEDOUT = -3,
81  INVALID = -4
82  };
83  };
84 
93  Reader(Policy policy, std::shared_ptr<BufferLayout> bufferLayout, size_t id);
94 
96  ~Reader();
97 
117  using OutputFunction = std::function<bool(void* buf, size_t nWords)>;
118 
140  ssize_t read(
141  OutputFunction function,
142  size_t maxWordsCanRead,
143  std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
144 
163  ssize_t read(void* buf, size_t nWords, std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
164 
178  bool seek(Index offset, Reference reference = Reference::ABSOLUTE);
179 
189  Index tell(Reference reference = Reference::ABSOLUTE) const;
190 
206  void close(Index offset = 0, Reference reference = Reference::AFTER_READER);
207 
215  size_t getId() const;
216 
223  size_t getWordSize() const;
224 
230  static std::string errorToString(Error error);
231 
232 private:
236  static const std::string TAG;
237 
239  Policy m_policy;
240 
242  std::shared_ptr<BufferLayout> m_bufferLayout;
243 
248  size_t m_id;
249 
251  AtomicIndex* m_readerCursor;
252 
254  AtomicIndex* m_readerCloseIndex;
255 };
256 
257 template <typename T>
258 const std::string SharedDataStream<T>::Reader::TAG = "SdsReader";
259 
260 template <typename T>
261 SharedDataStream<T>::Reader::Reader(Policy policy, std::shared_ptr<BufferLayout> bufferLayout, size_t id) :
262  m_policy{policy},
263  m_bufferLayout{bufferLayout},
264  m_id{id},
265  m_readerCursor{&m_bufferLayout->getReaderCursorArray()[m_id]},
266  m_readerCloseIndex{&m_bufferLayout->getReaderCloseIndexArray()[m_id]} {
267  // Note - SharedDataStream::createReader() holds readerEnableMutex while calling this function.
268  // Read new data only.
269  // Note: It is important that new readers start with their cursor at the writer. This allows
270  // updateOldestUnconsumedCursor() to be thread-safe without holding readerEnableMutex. See
271  // updateOldestUnconsumedCursor() comments for further explanation.
272  *m_readerCursor = m_bufferLayout->getHeader()->writeStartCursor.load();
273 
274  // Read indefinitely.
275  *m_readerCloseIndex = std::numeric_limits<Index>::max();
276 
277  m_bufferLayout->enableReaderLocked(m_id);
278 }
279 
280 template <typename T>
282  // Note: We can't leave a reader with its cursor in the future; doing so can introduce a race condition in
283  // updateOldestUnconsumedCursor(). See updateOldestUnconsumedCursor() comments for further explanation.
284  seek(0, Reference::BEFORE_WRITER);
285 
286  std::lock_guard<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
287  m_bufferLayout->disableReaderLocked(m_id);
288  m_bufferLayout->updateOldestUnconsumedCursor();
289 }
290 
291 template <typename T>
292 ssize_t SharedDataStream<T>::Reader::read(void* buf, size_t nWords, std::chrono::milliseconds timeout) {
293  if (nullptr == buf) {
294  logger::acsdkError(logger::LogEntry(TAG, "readFailed").d("reason", "nullBuffer"));
295  return Error::INVALID;
296  }
297 
298  auto buf8 = static_cast<uint8_t*>(buf);
299  auto wordSize = getWordSize();
300  return read(
301  [&buf8, wordSize](void* readBuf, size_t wordsToRead) {
302  memcpy(buf8, readBuf, wordsToRead * wordSize);
303  buf8 += wordsToRead * wordSize;
304  return true;
305  },
306  nWords,
307  timeout);
308 }
309 
310 template <typename T>
311 ssize_t SharedDataStream<T>::Reader::read(OutputFunction function, size_t nWords, std::chrono::milliseconds timeout) {
312  if (nullptr == function) {
313  logger::acsdkError(logger::LogEntry(TAG, "readFailed").d("reason", "nullFunction"));
314  return Error::INVALID;
315  }
316 
317  if (0 == nWords) {
318  logger::acsdkError(logger::LogEntry(TAG, "readFailed").d("reason", "invalidNumWords").d("numWords", nWords));
319  return Error::INVALID;
320  }
321 
322  // Check if closed.
323  auto readerCloseIndex = m_readerCloseIndex->load();
324  if (*m_readerCursor >= readerCloseIndex) {
325  return Error::CLOSED;
326  }
327 
328  // Initial check for overrun.
329  auto header = m_bufferLayout->getHeader();
330  if ((header->writeEndCursor >= *m_readerCursor) &&
331  (header->writeEndCursor - *m_readerCursor) > m_bufferLayout->getDataSize()) {
332  return Error::OVERRUN;
333  }
334 
335  std::unique_lock<Mutex> lock(header->dataAvailableMutex, std::defer_lock);
336  if (Policy::BLOCKING == m_policy) {
337  lock.lock();
338  }
339 
340  // Figure out how much we can actually copy.
341  size_t wordsAvailable = tell(Reference::BEFORE_WRITER);
342  if (0 == wordsAvailable) {
343  if (header->writeEndCursor > 0 && !header->isWriterEnabled) {
344  return Error::CLOSED;
345  } else if (Policy::NONBLOCKING == m_policy) {
346  return Error::WOULDBLOCK;
347  } else if (Policy::BLOCKING == m_policy) {
348  // Condition for returning from read: the Writer has been closed or there is data to read
349  auto predicate = [this, header] {
350  return header->hasWriterBeenClosed || tell(Reference::BEFORE_WRITER) > 0;
351  };
352 
353  if (std::chrono::milliseconds::zero() == timeout) {
354  header->dataAvailableConditionVariable.wait(lock, predicate);
355  } else if (!header->dataAvailableConditionVariable.wait_for(lock, timeout, predicate)) {
356  return Error::TIMEDOUT;
357  }
358  }
359  wordsAvailable = tell(Reference::BEFORE_WRITER);
360 
361  // If there is still no data, the writer has closed in the interim
362  if (0 == wordsAvailable) {
363  return Error::CLOSED;
364  }
365  }
366 
367  if (Policy::BLOCKING == m_policy) {
368  lock.unlock();
369  }
370  if (nWords > wordsAvailable) {
371  nWords = wordsAvailable;
372  }
373 
374  // Don't read beyond our close index.
375  if ((*m_readerCursor + nWords) > readerCloseIndex) {
376  nWords = readerCloseIndex - *m_readerCursor;
377  }
378 
379  // Split it across the wrap.
380  size_t beforeWrap = m_bufferLayout->wordsUntilWrap(*m_readerCursor);
381  if (beforeWrap > nWords) {
382  beforeWrap = nWords;
383  }
384  size_t afterWrap = nWords - beforeWrap;
385 
386  // Copy the two segments.
387  if (!function(m_bufferLayout->getData(*m_readerCursor), beforeWrap)) {
388  // We haven't changed the read pointer yet, just error out.
389  return Error::INVALID;
390  }
391 
392  if (afterWrap > 0 && !function(m_bufferLayout->getData(*m_readerCursor + beforeWrap), afterWrap)) {
393  // By API if either of these calls fail the entire read fails and no data is consumed.
394  return Error::INVALID;
395  }
396 
397  // Advance the read cursor.
398  *m_readerCursor += nWords;
399 
400  // Final check for overrun (do this before the updateOldestUnconsumedCursor() call below for improved accuracy).
401  bool overrun = ((header->writeEndCursor - *m_readerCursor) > m_bufferLayout->getDataSize());
402 
403  // Move the unconsumed cursor before returning.
404  m_bufferLayout->updateOldestUnconsumedCursor();
405 
406  // Now we can safely error out if there was an overrun.
407  if (overrun) {
408  return Error::OVERRUN;
409  }
410 
411  return nWords;
412 }
413 
414 template <typename T>
416  auto header = m_bufferLayout->getHeader();
417  auto writeStartCursor = &header->writeStartCursor;
418  auto writeEndCursor = &header->writeEndCursor;
419  Index absolute = std::numeric_limits<Index>::max();
420 
421  switch (reference) {
422  case Reference::AFTER_READER:
423  absolute = *m_readerCursor + offset;
424  break;
425  case Reference::BEFORE_READER:
426  if (offset > *m_readerCursor) {
427  logger::acsdkError(logger::LogEntry(TAG, "seekFailed")
428  .d("reason", "seekBeforeStreamStart")
429  .d("reference", "BEFORE_READER")
430  .d("seekOffset", offset)
431  .d("readerCursor", m_readerCursor->load()));
432  return false;
433  }
434  absolute = *m_readerCursor - offset;
435  break;
436  case Reference::BEFORE_WRITER:
437  if (offset > *writeStartCursor) {
438  logger::acsdkError(logger::LogEntry(TAG, "seekFailed")
439  .d("reason", "seekBeforeStreamStart")
440  .d("reference", "BEFORE_WRITER")
441  .d("seekOffset", offset)
442  .d("writeStartCursor", writeStartCursor->load()));
443  return false;
444  }
445  absolute = *writeStartCursor - offset;
446  break;
447  case Reference::ABSOLUTE:
448  absolute = offset;
449  }
450 
451  // Don't seek beyond the close index.
452  if (absolute > *m_readerCloseIndex) {
453  logger::acsdkError(logger::LogEntry(TAG, "seekFailed")
454  .d("reason", "seekBeyondCloseIndex")
455  .d("position", absolute)
456  .d("readerCloseIndex", m_readerCloseIndex->load()));
457  return false;
458  }
459 
460  // Per documentation of updateOldestUnconsumedCursor(), don't try to seek backwards while oldestConsumedCursor is
461  // being updated.
462  bool backward = absolute < *m_readerCursor;
463  std::unique_lock<Mutex> lock(header->backwardSeekMutex, std::defer_lock);
464  if (backward) {
465  lock.lock();
466  }
467 
468  // Don't seek to past data which has been (or soon will be) overwritten.
469  // Note: If this is a backward seek, it is important that this check is performed while holding the
470  // backwardSeekMutex to prevent a writer from starting to overwrite us between here and the m_readerCursor update
471  // below. If this is not a backward seek, then the mutex is not held.
472  if (*writeEndCursor >= absolute && *writeEndCursor - absolute > m_bufferLayout->getDataSize()) {
473  logger::acsdkError(logger::LogEntry(TAG, "seekFailed").d("reason", "seekOverwrittenData"));
474  return false;
475  }
476 
477  *m_readerCursor = absolute;
478 
479  if (backward) {
480  header->oldestUnconsumedCursor = 0;
481  m_bufferLayout->updateOldestUnconsumedCursorLocked();
482  lock.unlock();
483  } else {
484  m_bufferLayout->updateOldestUnconsumedCursor();
485  }
486 
487  return true;
488 }
489 
490 template <typename T>
492  auto writeStartCursor = &m_bufferLayout->getHeader()->writeStartCursor;
493  switch (reference) {
494  case Reference::AFTER_READER:
495  return 0;
496  case Reference::BEFORE_READER:
497  return 0;
498  case Reference::BEFORE_WRITER:
499  return (*writeStartCursor >= *m_readerCursor) ? *writeStartCursor - *m_readerCursor : 0;
500  case Reference::ABSOLUTE:
501  return *m_readerCursor;
502  }
503  logger::acsdkError(logger::LogEntry(TAG, "tellFailed").d("reason", "invalidReference"));
504  return std::numeric_limits<Index>::max();
505 }
506 
507 template <typename T>
509  auto writeStartCursor = &m_bufferLayout->getHeader()->writeStartCursor;
510  Index absolute = 0;
511  bool validReference = false;
512  switch (reference) {
513  case Reference::AFTER_READER:
514  absolute = *m_readerCursor + offset;
515  validReference = true;
516  break;
517  case Reference::BEFORE_READER:
518  absolute = *m_readerCursor;
519  validReference = true;
520  break;
521  case Reference::BEFORE_WRITER:
522  if (*writeStartCursor < offset) {
523  logger::acsdkError(logger::LogEntry(TAG, "closeFailed")
524  .d("reason", "invalidIndex")
525  .d("reference", "BEFORE_WRITER")
526  .d("offset", offset)
527  .d("writeStartCursor", writeStartCursor->load()));
528  } else {
529  absolute = *writeStartCursor - offset;
530  }
531  validReference = true;
532  break;
533  case Reference::ABSOLUTE:
534  absolute = offset;
535  validReference = true;
536  break;
537  }
538  if (!validReference) {
539  logger::acsdkError(logger::LogEntry(TAG, "closeFailed").d("reason", "invalidReference"));
540  }
541 
542  *m_readerCloseIndex = absolute;
543 }
544 
545 template <typename T>
547  return m_id;
548 }
549 
550 template <typename T>
552  return m_bufferLayout->getHeader()->wordSize;
553 }
554 
555 template <typename T>
557  switch (error) {
558  case Error::CLOSED:
559  return "CLOSED";
560  case Error::OVERRUN:
561  return "OVERRUN";
562  case Error::WOULDBLOCK:
563  return "WOULDBLOCK";
564  case Error::TIMEDOUT:
565  return "TIMEDOUT";
566  case Error::INVALID:
567  return "INVALID";
568  }
569  logger::acsdkError(logger::LogEntry(TAG, "errorToStringFailed").d("reason", "invalidError").d("error", error));
570  return "(unknown error " + to_string(error) + ")";
571 }
572 
573 } // namespace sds
574 } // namespace utils
575 } // namespace avsCommon
576 } // namespace alexaClientSDK
577 
578 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_READER_H_
Returned when policy is Policy::BLOCKING and no data becomes available before the specified timeout...
Definition: Reader.h:79
Reader(Policy policy, std::shared_ptr< BufferLayout > bufferLayout, size_t id)
Definition: Reader.h:261
Reference
Specifies a reference to measure seek()/tell()/close() offsets against.
Definition: Reader.h:51
Returned when a read() parameter is invalid.
Definition: Reader.h:81
Returned when policy is Policy::NONBLOCKING and no data is available.
Definition: Reader.h:77
Returned when the data requested has been overwritten and is invalid.
Definition: Reader.h:75
::std::string string
Definition: gtest-port.h:1097
static std::string errorToString(Error error)
Definition: Reader.h:556
std::function< bool(void *buf, size_t nWords)> OutputFunction
Definition: Reader.h:117
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
ssize_t read(OutputFunction function, size_t maxWordsCanRead, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))
Definition: Reader.h:311
void close(Index offset=0, Reference reference=Reference::AFTER_READER)
Definition: Reader.h:508
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
ReaderPolicy
Specifies the policy to use for reading from the stream.
Definition: ReaderPolicy.h:25
Index tell(Reference reference=Reference::ABSOLUTE) const
Definition: Reader.h:491
~Reader()
This destructor detaches the Reader from a BufferLayout.
Definition: Reader.h:281
void acsdkError(const LogEntry &entry)
bool seek(Index offset, Reference reference=Reference::ABSOLUTE)
Definition: Reader.h:415
LogEntry is used to compile the log entry text to log via Logger.
Definition: LogEntry.h:33

AlexaClientSDK 3.0.0 - Copyright 2016-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0