16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_    17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_    94 class SharedDataStream {
   105     using Index = uint64_t;
   108     using AtomicIndex = 
typename T::AtomicIndex;
   111     using AtomicBool = 
typename T::AtomicBool;
   114     using Buffer = 
typename T::Buffer;
   117     using Mutex = 
typename T::Mutex;
   120     using ConditionVariable = 
typename T::ConditionVariable;
   140     static size_t calculateBufferSize(
size_t nWords, 
size_t wordSize = 1, 
size_t maxReaders = 1);
   156     static std::unique_ptr<SharedDataStream> create(
   157         std::shared_ptr<Buffer> buffer,
   159         size_t maxReaders = 1);
   169     static std::unique_ptr<SharedDataStream> create(
   170         std::shared_ptr<Buffer> buffer,
   173         size_t maxEphemeralReaders);
   185     static std::unique_ptr<SharedDataStream> open(std::shared_ptr<Buffer> buffer);
   193     size_t getMaxReaders() 
const;
   201     Index getDataSize() 
const;
   209     size_t getWordSize() 
const;
   226     std::unique_ptr<Writer> createWriter(
typename Writer::Policy policy, 
bool forceReplacement = 
false);
   239     std::unique_ptr<Reader> createReader(
typename Reader::Policy policy, 
bool startWithNewData = 
false);
   267     std::unique_ptr<Reader> createReader(
   270         bool startWithNewData = 
false,
   271         bool forceReplacement = 
false);
   279     SharedDataStream(std::shared_ptr<typename T::Buffer> buffer);
   306     std::unique_ptr<Reader> createReaderLocked(
   309         bool startWithNewData,
   310         bool forceReplacement,
   311         std::unique_lock<Mutex>* lock);
   321     static const int MAX_READER_CREATION_RETRIES;
   324     std::shared_ptr<BufferLayout> m_bufferLayout;
   327 template <
typename T>
   328 const int SharedDataStream<T>::MAX_READER_CREATION_RETRIES = 3;
   330 template <
typename T>
   333 template <
typename T>
   334 size_t SharedDataStream<T>::calculateBufferSize(
size_t nWords, 
size_t wordSize, 
size_t maxReaders) {
   338     } 
else if (0 == wordSize) {
   343     size_t dataSize = nWords * wordSize;
   344     return overhead + dataSize;
   347 template <
typename T>
   348 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::create(
   349     std::shared_ptr<Buffer> buffer,
   352     return create(buffer, wordSize, maxReaders, maxReaders);
   355 template <
typename T>
   356 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::create(
   357     std::shared_ptr<Buffer> buffer,
   360     size_t maxEphemeralReaders) {
   361     size_t expectedSize = calculateBufferSize(1, wordSize, maxReaders);
   362     if (0 == expectedSize) {
   365     } 
else if (
nullptr == buffer) {
   368     } 
else if (expectedSize > buffer->size()) {
   370                                .d(
"reason", 
"bufferSizeTooSmall")
   371                                .d(
"bufferSize", buffer->size())
   372                                .d(
"expectedSize", expectedSize));
   374     } 
else if (maxEphemeralReaders > maxReaders) {
   375         logger::acsdkError(logger::LogEntry(
TAG, 
"createFailed").d(
"reason", 
"maxEphemeralReaders > maxReaders"));
   379     std::unique_ptr<SharedDataStream<T>> sds(
new SharedDataStream<T>(buffer));
   380     if (!sds->m_bufferLayout->init(wordSize, maxReaders, maxEphemeralReaders)) {
   387 template <
typename T>
   388 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::open(std::shared_ptr<Buffer> buffer) {
   389     std::unique_ptr<SharedDataStream<T>> sds(
new SharedDataStream<T>(buffer));
   390     if (!sds->m_bufferLayout->attach()) {
   398 template <
typename T>
   399 size_t SharedDataStream<T>::getMaxReaders()
 const {
   400     return m_bufferLayout->getHeader()->maxReaders;
   403 template <
typename T>
   405     return m_bufferLayout->getDataSize();
   408 template <
typename T>
   409 size_t SharedDataStream<T>::getWordSize()
 const {
   410     return m_bufferLayout->getHeader()->wordSize;
   413 template <
typename T>
   414 std::unique_ptr<typename SharedDataStream<T>::Writer> SharedDataStream<T>::createWriter(
   416     bool forceReplacement) {
   417     auto header = m_bufferLayout->getHeader();
   418     std::lock_guard<Mutex> lock(header->writerEnableMutex);
   419     if (header->isWriterEnabled && !forceReplacement) {
   421                                .d(
"reason", 
"existingWriterAttached")
   422                                .d(
"forceReplacement", 
"false"));
   425         return std::unique_ptr<Writer>(
new Writer(policy, m_bufferLayout));
   429 template <
typename T>
   430 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReader(
   432     bool startWithNewData) {
   433     std::unique_lock<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
   434     for (
size_t id = 0; 
id < m_bufferLayout->getHeader()->maxEphemeralReaders; ++id) {
   435         if (!m_bufferLayout->isReaderEnabled(
id)) {
   436             return createReaderLocked(
id, policy, startWithNewData, 
false, &lock);
   443 template <
typename T>
   444 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReader(
   447     bool startWithNewData,
   448     bool forceReplacement) {
   449     std::unique_lock<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
   450     return createReaderLocked(
id, policy, startWithNewData, forceReplacement, &lock);
   453 template <
typename T>
   454 SharedDataStream<T>::SharedDataStream(std::shared_ptr<Buffer> buffer) :
   455         m_bufferLayout{std::make_shared<BufferLayout>(buffer)} {
   458 template <
typename T>
   459 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReaderLocked(
   461     typename Reader::Policy policy,
   462     bool startWithNewData,
   463     bool forceReplacement,
   464     std::unique_lock<Mutex>* lock) {
   465     if (m_bufferLayout->isReaderEnabled(
id) && !forceReplacement) {
   467                                .d(
"reason", 
"readerAlreadyAttached")
   469                                .d(
"forceReplacement", 
"false"));
   475         auto reader = std::unique_ptr<Reader>(
new Reader(policy, m_bufferLayout, 
id));
   478         if (startWithNewData) {
   480             m_bufferLayout->updateOldestUnconsumedCursor();
   484         auto headerPtr = m_bufferLayout->getHeader();
   485         for (
int i = 0; i < MAX_READER_CREATION_RETRIES; i++) {
   486             auto offset = m_bufferLayout->getDataSize();
   487             auto writeStartCursor = headerPtr->writeStartCursor.load();
   488             auto reference = Reader::Reference::BEFORE_WRITER;
   489             if (writeStartCursor < offset) {
   494                 reference = Reader::Reference::ABSOLUTE;
   496                 auto writeEndCursor = headerPtr->writeEndCursor.load();
   497                 if (writeEndCursor < writeStartCursor) {
   499                                            .d(
"reason", 
"writeCursorBeyondEndCursor?")
   503                 auto wordsBeingWritten = writeEndCursor - writeStartCursor;
   504                 if (offset < wordsBeingWritten) {
   506                                           .d(
"reason", 
"detectedWriterOverflow")
   510                 offset -= wordsBeingWritten;
   513             if (reader->seek(offset, reference)) {
   520             logger::LogEntry(
TAG, 
"createReaderLockedFailed").d(
"reason", 
"seekRetriesExhausted").d(
"readerId", 
id));
   534 #endif  // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_ void acsdkWarn(const LogEntry &entry)
 
::std::string string
Definition: gtest-port.h:1097
 
WriterPolicy Policy
Specifies the policy to use for writing to the stream. 
Definition: Writer.h:49
 
#define TAG
String to identify log entries originating from this file. 
Definition: TestableMessageObserver.cpp:27
 
static size_t calculateDataOffset(size_t wordSize, size_t maxReaders)
Definition: BufferLayout.h:634
 
ReaderPolicy Policy
Specifies the policy to use for reading from the stream. 
Definition: Reader.h:48
 
Index
Index used for setting access. 
Definition: StateReportGeneratorTest.cpp:41
 
Whether or not curl logs should be emitted. 
Definition: AVSConnectionManager.h:36
 
void acsdkError(const LogEntry &entry)