16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_ 17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_ 94 class SharedDataStream {
105 using Index = uint64_t;
108 using AtomicIndex =
typename T::AtomicIndex;
111 using AtomicBool =
typename T::AtomicBool;
114 using Buffer =
typename T::Buffer;
117 using Mutex =
typename T::Mutex;
120 using ConditionVariable =
typename T::ConditionVariable;
140 static size_t calculateBufferSize(
size_t nWords,
size_t wordSize = 1,
size_t maxReaders = 1);
156 static std::unique_ptr<SharedDataStream> create(
157 std::shared_ptr<Buffer> buffer,
159 size_t maxReaders = 1);
169 static std::unique_ptr<SharedDataStream> create(
170 std::shared_ptr<Buffer> buffer,
173 size_t maxEphemeralReaders);
185 static std::unique_ptr<SharedDataStream> open(std::shared_ptr<Buffer> buffer);
193 size_t getMaxReaders()
const;
201 Index getDataSize()
const;
209 size_t getWordSize()
const;
226 std::unique_ptr<Writer> createWriter(
typename Writer::Policy policy,
bool forceReplacement =
false);
239 std::unique_ptr<Reader> createReader(
typename Reader::Policy policy,
bool startWithNewData =
false);
267 std::unique_ptr<Reader> createReader(
270 bool startWithNewData =
false,
271 bool forceReplacement =
false);
279 SharedDataStream(std::shared_ptr<typename T::Buffer> buffer);
306 std::unique_ptr<Reader> createReaderLocked(
309 bool startWithNewData,
310 bool forceReplacement,
311 std::unique_lock<Mutex>* lock);
321 static const int MAX_READER_CREATION_RETRIES;
324 std::shared_ptr<BufferLayout> m_bufferLayout;
327 template <
typename T>
328 const int SharedDataStream<T>::MAX_READER_CREATION_RETRIES = 3;
330 template <
typename T>
333 template <
typename T>
334 size_t SharedDataStream<T>::calculateBufferSize(
size_t nWords,
size_t wordSize,
size_t maxReaders) {
338 }
else if (0 == wordSize) {
343 size_t dataSize = nWords * wordSize;
344 return overhead + dataSize;
347 template <
typename T>
348 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::create(
349 std::shared_ptr<Buffer> buffer,
352 return create(buffer, wordSize, maxReaders, maxReaders);
355 template <
typename T>
356 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::create(
357 std::shared_ptr<Buffer> buffer,
360 size_t maxEphemeralReaders) {
361 size_t expectedSize = calculateBufferSize(1, wordSize, maxReaders);
362 if (0 == expectedSize) {
365 }
else if (
nullptr == buffer) {
368 }
else if (expectedSize > buffer->size()) {
370 .d(
"reason",
"bufferSizeTooSmall")
371 .d(
"bufferSize", buffer->size())
372 .d(
"expectedSize", expectedSize));
374 }
else if (maxEphemeralReaders > maxReaders) {
375 logger::acsdkError(logger::LogEntry(
TAG,
"createFailed").d(
"reason",
"maxEphemeralReaders > maxReaders"));
379 std::unique_ptr<SharedDataStream<T>> sds(
new SharedDataStream<T>(buffer));
380 if (!sds->m_bufferLayout->init(wordSize, maxReaders, maxEphemeralReaders)) {
387 template <
typename T>
388 std::unique_ptr<SharedDataStream<T>> SharedDataStream<T>::open(std::shared_ptr<Buffer> buffer) {
389 std::unique_ptr<SharedDataStream<T>> sds(
new SharedDataStream<T>(buffer));
390 if (!sds->m_bufferLayout->attach()) {
398 template <
typename T>
399 size_t SharedDataStream<T>::getMaxReaders()
const {
400 return m_bufferLayout->getHeader()->maxReaders;
403 template <
typename T>
405 return m_bufferLayout->getDataSize();
408 template <
typename T>
409 size_t SharedDataStream<T>::getWordSize()
const {
410 return m_bufferLayout->getHeader()->wordSize;
413 template <
typename T>
414 std::unique_ptr<typename SharedDataStream<T>::Writer> SharedDataStream<T>::createWriter(
416 bool forceReplacement) {
417 auto header = m_bufferLayout->getHeader();
418 std::lock_guard<Mutex> lock(header->writerEnableMutex);
419 if (header->isWriterEnabled && !forceReplacement) {
421 .d(
"reason",
"existingWriterAttached")
422 .d(
"forceReplacement",
"false"));
425 return std::unique_ptr<Writer>(
new Writer(policy, m_bufferLayout));
429 template <
typename T>
430 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReader(
432 bool startWithNewData) {
433 std::unique_lock<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
434 for (
size_t id = 0;
id < m_bufferLayout->getHeader()->maxEphemeralReaders; ++id) {
435 if (!m_bufferLayout->isReaderEnabled(
id)) {
436 return createReaderLocked(
id, policy, startWithNewData,
false, &lock);
443 template <
typename T>
444 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReader(
447 bool startWithNewData,
448 bool forceReplacement) {
449 std::unique_lock<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
450 return createReaderLocked(
id, policy, startWithNewData, forceReplacement, &lock);
453 template <
typename T>
454 SharedDataStream<T>::SharedDataStream(std::shared_ptr<Buffer> buffer) :
455 m_bufferLayout{std::make_shared<BufferLayout>(buffer)} {
458 template <
typename T>
459 std::unique_ptr<typename SharedDataStream<T>::Reader> SharedDataStream<T>::createReaderLocked(
461 typename Reader::Policy policy,
462 bool startWithNewData,
463 bool forceReplacement,
464 std::unique_lock<Mutex>* lock) {
465 if (m_bufferLayout->isReaderEnabled(
id) && !forceReplacement) {
467 .d(
"reason",
"readerAlreadyAttached")
469 .d(
"forceReplacement",
"false"));
475 auto reader = std::unique_ptr<Reader>(
new Reader(policy, m_bufferLayout,
id));
478 if (startWithNewData) {
480 m_bufferLayout->updateOldestUnconsumedCursor();
484 auto headerPtr = m_bufferLayout->getHeader();
485 for (
int i = 0; i < MAX_READER_CREATION_RETRIES; i++) {
486 auto offset = m_bufferLayout->getDataSize();
487 auto writeStartCursor = headerPtr->writeStartCursor.load();
488 auto reference = Reader::Reference::BEFORE_WRITER;
489 if (writeStartCursor < offset) {
494 reference = Reader::Reference::ABSOLUTE;
496 auto writeEndCursor = headerPtr->writeEndCursor.load();
497 if (writeEndCursor < writeStartCursor) {
499 .d(
"reason",
"writeCursorBeyondEndCursor?")
503 auto wordsBeingWritten = writeEndCursor - writeStartCursor;
504 if (offset < wordsBeingWritten) {
506 .d(
"reason",
"detectedWriterOverflow")
510 offset -= wordsBeingWritten;
513 if (reader->seek(offset, reference)) {
520 logger::LogEntry(
TAG,
"createReaderLockedFailed").d(
"reason",
"seekRetriesExhausted").d(
"readerId",
id));
534 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_SHAREDDATASTREAM_H_ void acsdkWarn(const LogEntry &entry)
::std::string string
Definition: gtest-port.h:1097
WriterPolicy Policy
Specifies the policy to use for writing to the stream.
Definition: Writer.h:49
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
static size_t calculateDataOffset(size_t wordSize, size_t maxReaders)
Definition: BufferLayout.h:634
ReaderPolicy Policy
Specifies the policy to use for reading from the stream.
Definition: Reader.h:48
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
void acsdkError(const LogEntry &entry)