AlexaClientSDK  3.0.0
A cross-platform, modular SDK for interacting with the Alexa Voice Service
SharedDataStream.h
Go to the documentation of this file.
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0/
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_
17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_
18 
19 #include <cstdint>
20 #include <cstddef>
21 #include <memory>
22 
24 
25 namespace alexaClientSDK {
26 namespace avsCommon {
27 namespace utils {
28 namespace sds {
29 
93 template <typename T>
94 class SharedDataStream {
95 private:
96  // Forward declare the nested @c BufferLayout structure (full declaration is in @c BufferLayout.h).
97  class BufferLayout;
98 
99 public:
105  using Index = uint64_t;
106 
108  using AtomicIndex = typename T::AtomicIndex;
109 
111  using AtomicBool = typename T::AtomicBool;
112 
114  using Buffer = typename T::Buffer;
115 
117  using Mutex = typename T::Mutex;
118 
120  using ConditionVariable = typename T::ConditionVariable;
121 
122  // Forward declare the nested @c Reader class (full declaration is in @c Reader.h).
123  class Reader;
124 
125  // Forward declare the nested @c Writer class (full declaraion is in @c Writer.h).
126  class Writer;
127 
140  static size_t calculateBufferSize(size_t nWords, size_t wordSize = 1, size_t maxReaders = 1);
141 
156  static std::unique_ptr<SharedDataStream> create(
157  std::shared_ptr<Buffer> buffer,
158  size_t wordSize = 1,
159  size_t maxReaders = 1);
160 
169  static std::unique_ptr<SharedDataStream> create(
170  std::shared_ptr<Buffer> buffer,
171  size_t wordSize,
172  size_t maxReaders,
173  size_t maxEphemeralReaders);
174 
185  static std::unique_ptr<SharedDataStream> open(std::shared_ptr<Buffer> buffer);
186 
193  size_t getMaxReaders() const;
194 
201  Index getDataSize() const;
202 
209  size_t getWordSize() const;
210 
226  std::unique_ptr<Writer> createWriter(typename Writer::Policy policy, bool forceReplacement = false);
227 
239  std::unique_ptr<Reader> createReader(typename Reader::Policy policy, bool startWithNewData = false);
240 
267  std::unique_ptr<Reader> createReader(
268  size_t id,
269  typename Reader::Policy policy,
270  bool startWithNewData = false,
271  bool forceReplacement = false);
272 
273 private:
279  SharedDataStream(std::shared_ptr<typename T::Buffer> buffer);
280 
306  std::unique_ptr<Reader> createReaderLocked(
307  size_t id,
308  typename Reader::Policy policy,
309  bool startWithNewData,
310  bool forceReplacement,
311  std::unique_lock<Mutex>* lock);
312 
316  static const std::string TAG;
317 
321  static const int MAX_READER_CREATION_RETRIES;
322 
324  std::shared_ptr<BufferLayout> m_bufferLayout;
325 };
326 
327 template <typename T>
328 const int SharedDataStream<T>::MAX_READER_CREATION_RETRIES = 3;
329 
330 template <typename T>
331 const std::string SharedDataStream<T>::TAG = "SharedDataStream";
332 
333 template <typename T>
334 size_t SharedDataStream<T>::calculateBufferSize(size_t nWords, size_t wordSize, size_t maxReaders) {
335  if (0 == nWords) {
336  logger::acsdkError(logger::LogEntry(TAG, "calculateBufferSizeFailed").d("reason", "numWordsZero"));
337  return 0;
338  } else if (0 == wordSize) {
339  logger::acsdkError(logger::LogEntry(TAG, "calculateBufferSizeFailed").d("reason", "wordSizeZero"));
340  return 0;
341  }
342  size_t overhead = BufferLayout::calculateDataOffset(wordSize, maxReaders);
343  size_t dataSize = nWords * wordSize;
344  return overhead + dataSize;
345 }
346 
347 template <typename T>
348 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::create(
349  std::shared_ptr<Buffer> buffer,
350  size_t wordSize,
351  size_t maxReaders) {
352  return create(buffer, wordSize, maxReaders, maxReaders);
353 }
354 
355 template <typename T>
356 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::create(
357  std::shared_ptr<Buffer> buffer,
358  size_t wordSize,
359  size_t maxReaders,
360  size_t maxEphemeralReaders) {
361  size_t expectedSize = calculateBufferSize(1, wordSize, maxReaders);
362  if (0 == expectedSize) {
363  // Logged in calcutlateBuffersize().
364  return nullptr;
365  } else if (nullptr == buffer) {
366  logger::acsdkError(logger::LogEntry(TAG, "createFailed").d("reason", "nullBuffer"));
367  return nullptr;
368  } else if (expectedSize > buffer->size()) {
369  logger::acsdkError(logger::LogEntry(TAG, "createFailed")
370  .d("reason", "bufferSizeTooSmall")
371  .d("bufferSize", buffer->size())
372  .d("expectedSize", expectedSize));
373  return nullptr;
374  } else if (maxEphemeralReaders > maxReaders) {
375  logger::acsdkError(logger::LogEntry(TAG, "createFailed").d("reason", "maxEphemeralReaders > maxReaders"));
376  return nullptr;
377  }
378 
379  std::unique_ptr<SharedDataStream<T>> sds(new SharedDataStream<T>(buffer));
380  if (!sds->m_bufferLayout->init(wordSize, maxReaders, maxEphemeralReaders)) {
381  // Logged in init().
382  return nullptr;
383  }
384  return sds;
385 }
386 
387 template <typename T>
388 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::open(std::shared_ptr<Buffer> buffer) {
389  std::unique_ptr<SharedDataStream<T>> sds(new SharedDataStream<T>(buffer));
390  if (!sds->m_bufferLayout->attach()) {
391  // Logged in attach().
392  return nullptr;
393  } else {
394  return sds;
395  }
396 }
397 
398 template <typename T>
399 size_t SharedDataStream<T>::getMaxReaders() const {
400  return m_bufferLayout->getHeader()->maxReaders;
401 }
402 
403 template <typename T>
404 typename SharedDataStream<T>::Index SharedDataStream<T>::getDataSize() const {
405  return m_bufferLayout->getDataSize();
406 }
407 
408 template <typename T>
409 size_t SharedDataStream<T>::getWordSize() const {
410  return m_bufferLayout->getHeader()->wordSize;
411 }
412 
413 template <typename T>
414 std::unique_ptr<typename SharedDataStream<T>::Writer> SharedDataStream<T>::createWriter(
415  typename Writer::Policy policy,
416  bool forceReplacement) {
417  auto header = m_bufferLayout->getHeader();
418  std::lock_guard<Mutex> lock(header->writerEnableMutex);
419  if (header->isWriterEnabled && !forceReplacement) {
420  logger::acsdkError(logger::LogEntry(TAG, "createWriterFailed")
421  .d("reason", "existingWriterAttached")
422  .d("forceReplacement", "false"));
423  return nullptr;
424  } else {
425  return std::unique_ptr<Writer>(new Writer(policy, m_bufferLayout));
426  }
427 }
428 
429 template <typename T>
430 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReader(
431  typename Reader::Policy policy,
432  bool startWithNewData) {
433  std::unique_lock<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
434  for (size_t id = 0; id < m_bufferLayout->getHeader()->maxEphemeralReaders; ++id) {
435  if (!m_bufferLayout->isReaderEnabled(id)) {
436  return createReaderLocked(id, policy, startWithNewData, false, &lock);
437  }
438  }
439  logger::acsdkError(logger::LogEntry(TAG, "createWriterFailed").d("reason", "noAvailableReaders"));
440  return nullptr;
441 }
442 
443 template <typename T>
444 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReader(
445  size_t id,
446  typename Reader::Policy policy,
447  bool startWithNewData,
448  bool forceReplacement) {
449  std::unique_lock<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
450  return createReaderLocked(id, policy, startWithNewData, forceReplacement, &lock);
451 }
452 
453 template <typename T>
454 SharedDataStream<T>::SharedDataStream(std::shared_ptr<Buffer> buffer) :
455  m_bufferLayout{std::make_shared<BufferLayout>(buffer)} {
456 }
457 
458 template <typename T>
459 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReaderLocked(
460  size_t id,
461  typename Reader::Policy policy,
462  bool startWithNewData,
463  bool forceReplacement,
464  std::unique_lock<Mutex>* lock) {
465  if (m_bufferLayout->isReaderEnabled(id) && !forceReplacement) {
466  logger::acsdkError(logger::LogEntry(TAG, "createReaderLockedFailed")
467  .d("reason", "readerAlreadyAttached")
468  .d("readerId", id)
469  .d("forceReplacement", "false"));
470  return nullptr;
471  } else {
472  // Note: Reader constructor does not call updateUnconsumedCursor() automatically, because we may be seeking to
473  // a blocked writer's cursor below (if !startWithNewData), and we don't want the writer to start moving before
474  // we seek.
475  auto reader = std::unique_ptr<Reader>(new Reader(policy, m_bufferLayout, id));
476  lock->unlock();
477 
478  if (startWithNewData) {
479  // We're not moving the cursor again, so call updateUnconsumedCursor() now.
480  m_bufferLayout->updateOldestUnconsumedCursor();
481  return reader;
482  }
483 
484  auto headerPtr = m_bufferLayout->getHeader();
485  for (int i = 0; i < MAX_READER_CREATION_RETRIES; i++) {
486  auto offset = m_bufferLayout->getDataSize();
487  auto writeStartCursor = headerPtr->writeStartCursor.load();
488  auto reference = Reader::Reference::BEFORE_WRITER;
489  if (writeStartCursor < offset) {
490  // For SDS without buffer overwritten, seek the very beginning of the stream using ABSOLUTE reference
491  // ABSOLUTE reference prevents a race condition between blocking writer and reader seek when writer
492  // writes more data to buffer, and writing cursor moves forward.
493  offset = 0;
494  reference = Reader::Reference::ABSOLUTE;
495  } else {
496  auto writeEndCursor = headerPtr->writeEndCursor.load();
497  if (writeEndCursor < writeStartCursor) {
498  logger::acsdkError(logger::LogEntry(TAG, "createReaderLockedError")
499  .d("reason", "writeCursorBeyondEndCursor?")
500  .d("readerId", id));
501  continue;
502  }
503  auto wordsBeingWritten = writeEndCursor - writeStartCursor;
504  if (offset < wordsBeingWritten) {
505  logger::acsdkWarn(logger::LogEntry(TAG, "createReaderLockedWarning")
506  .d("reason", "detectedWriterOverflow")
507  .d("readerId", id));
508  continue;
509  }
510  offset -= wordsBeingWritten;
511  }
512 
513  if (reader->seek(offset, reference)) {
514  // Note: seek() will call updateUnconsumedCursor() if it returns true.
515  return reader;
516  }
517  }
518 
520  logger::LogEntry(TAG, "createReaderLockedFailed").d("reason", "seekRetriesExhausted").d("readerId", id));
521  return nullptr;
522  }
523 }
524 
525 } // namespace sds
526 } // namespace utils
527 } // namespace avsCommon
528 } // namespace alexaClientSDK
529 
530 #include "BufferLayout.h"
531 #include "Reader.h"
532 #include "Writer.h"
533 
534 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_
void acsdkWarn(const LogEntry &entry)
::std::string string
Definition: gtest-port.h:1097
WriterPolicy Policy
Specifies the policy to use for writing to the stream.
Definition: Writer.h:49
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
static size_t calculateDataOffset(size_t wordSize, size_t maxReaders)
Definition: BufferLayout.h:634
ReaderPolicy Policy
Specifies the policy to use for reading from the stream.
Definition: Reader.h:48
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
void acsdkError(const LogEntry &entry)

AlexaClientSDK 3.0.0 - Copyright 2016-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0