16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_READER_H_ 17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_READER_H_ 93 Reader(
Policy policy, std::shared_ptr<BufferLayout> bufferLayout,
size_t id);
142 size_t maxWordsCanRead,
143 std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
163 ssize_t read(
void* buf,
size_t nWords, std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
178 bool seek(
Index offset,
Reference reference = Reference::ABSOLUTE);
206 void close(
Index offset = 0,
Reference reference = Reference::AFTER_READER);
215 size_t getId()
const;
223 size_t getWordSize()
const;
242 std::shared_ptr<BufferLayout> m_bufferLayout;
251 AtomicIndex* m_readerCursor;
254 AtomicIndex* m_readerCloseIndex;
257 template <
typename T>
258 const std::string SharedDataStream<T>::Reader::TAG =
"SdsReader";
260 template <
typename T>
263 m_bufferLayout{bufferLayout},
265 m_readerCursor{&m_bufferLayout->getReaderCursorArray()[m_id]},
266 m_readerCloseIndex{&m_bufferLayout->getReaderCloseIndexArray()[m_id]} {
272 *m_readerCursor = m_bufferLayout->getHeader()->writeStartCursor.load();
275 *m_readerCloseIndex = std::numeric_limits<Index>::max();
277 m_bufferLayout->enableReaderLocked(m_id);
280 template <
typename T>
284 seek(0, Reference::BEFORE_WRITER);
286 std::lock_guard<Mutex> lock(m_bufferLayout->getHeader()->readerEnableMutex);
287 m_bufferLayout->disableReaderLocked(m_id);
288 m_bufferLayout->updateOldestUnconsumedCursor();
291 template <
typename T>
293 if (
nullptr == buf) {
298 auto buf8 =
static_cast<uint8_t*
>(buf);
301 [&buf8, wordSize](
void* readBuf,
size_t wordsToRead) {
302 memcpy(buf8, readBuf, wordsToRead * wordSize);
303 buf8 += wordsToRead * wordSize;
310 template <
typename T>
312 if (
nullptr ==
function) {
323 auto readerCloseIndex = m_readerCloseIndex->load();
324 if (*m_readerCursor >= readerCloseIndex) {
329 auto header = m_bufferLayout->getHeader();
330 if ((header->writeEndCursor >= *m_readerCursor) &&
331 (header->writeEndCursor - *m_readerCursor) > m_bufferLayout->getDataSize()) {
335 std::unique_lock<Mutex> lock(header->dataAvailableMutex, std::defer_lock);
341 size_t wordsAvailable =
tell(Reference::BEFORE_WRITER);
342 if (0 == wordsAvailable) {
343 if (header->writeEndCursor > 0 && !header->isWriterEnabled) {
349 auto predicate = [
this, header] {
350 return header->hasWriterBeenClosed ||
tell(Reference::BEFORE_WRITER) > 0;
353 if (std::chrono::milliseconds::zero() == timeout) {
354 header->dataAvailableConditionVariable.wait(lock, predicate);
355 }
else if (!header->dataAvailableConditionVariable.wait_for(lock, timeout, predicate)) {
359 wordsAvailable =
tell(Reference::BEFORE_WRITER);
362 if (0 == wordsAvailable) {
370 if (nWords > wordsAvailable) {
371 nWords = wordsAvailable;
375 if ((*m_readerCursor + nWords) > readerCloseIndex) {
376 nWords = readerCloseIndex - *m_readerCursor;
380 size_t beforeWrap = m_bufferLayout->wordsUntilWrap(*m_readerCursor);
381 if (beforeWrap > nWords) {
384 size_t afterWrap = nWords - beforeWrap;
387 if (!
function(m_bufferLayout->getData(*m_readerCursor), beforeWrap)) {
392 if (afterWrap > 0 && !
function(m_bufferLayout->getData(*m_readerCursor + beforeWrap), afterWrap)) {
398 *m_readerCursor += nWords;
401 bool overrun = ((header->writeEndCursor - *m_readerCursor) > m_bufferLayout->getDataSize());
404 m_bufferLayout->updateOldestUnconsumedCursor();
414 template <
typename T>
416 auto header = m_bufferLayout->getHeader();
417 auto writeStartCursor = &header->writeStartCursor;
418 auto writeEndCursor = &header->writeEndCursor;
419 Index absolute = std::numeric_limits<Index>::max();
422 case Reference::AFTER_READER:
423 absolute = *m_readerCursor + offset;
425 case Reference::BEFORE_READER:
426 if (offset > *m_readerCursor) {
428 .d(
"reason",
"seekBeforeStreamStart")
429 .d(
"reference",
"BEFORE_READER")
430 .d(
"seekOffset", offset)
431 .d(
"readerCursor", m_readerCursor->load()));
434 absolute = *m_readerCursor - offset;
436 case Reference::BEFORE_WRITER:
437 if (offset > *writeStartCursor) {
439 .d(
"reason",
"seekBeforeStreamStart")
440 .d(
"reference",
"BEFORE_WRITER")
441 .d(
"seekOffset", offset)
442 .d(
"writeStartCursor", writeStartCursor->load()));
445 absolute = *writeStartCursor - offset;
447 case Reference::ABSOLUTE:
452 if (absolute > *m_readerCloseIndex) {
454 .d(
"reason",
"seekBeyondCloseIndex")
455 .d(
"position", absolute)
456 .d(
"readerCloseIndex", m_readerCloseIndex->load()));
462 bool backward = absolute < *m_readerCursor;
463 std::unique_lock<Mutex> lock(header->backwardSeekMutex, std::defer_lock);
472 if (*writeEndCursor >= absolute && *writeEndCursor - absolute > m_bufferLayout->getDataSize()) {
477 *m_readerCursor = absolute;
480 header->oldestUnconsumedCursor = 0;
481 m_bufferLayout->updateOldestUnconsumedCursorLocked();
484 m_bufferLayout->updateOldestUnconsumedCursor();
490 template <
typename T>
492 auto writeStartCursor = &m_bufferLayout->getHeader()->writeStartCursor;
494 case Reference::AFTER_READER:
496 case Reference::BEFORE_READER:
498 case Reference::BEFORE_WRITER:
499 return (*writeStartCursor >= *m_readerCursor) ? *writeStartCursor - *m_readerCursor : 0;
500 case Reference::ABSOLUTE:
501 return *m_readerCursor;
504 return std::numeric_limits<Index>::max();
507 template <
typename T>
509 auto writeStartCursor = &m_bufferLayout->getHeader()->writeStartCursor;
511 bool validReference =
false;
513 case Reference::AFTER_READER:
514 absolute = *m_readerCursor + offset;
515 validReference =
true;
517 case Reference::BEFORE_READER:
518 absolute = *m_readerCursor;
519 validReference =
true;
521 case Reference::BEFORE_WRITER:
522 if (*writeStartCursor < offset) {
524 .d(
"reason",
"invalidIndex")
525 .d(
"reference",
"BEFORE_WRITER")
527 .d(
"writeStartCursor", writeStartCursor->load()));
529 absolute = *writeStartCursor - offset;
531 validReference =
true;
533 case Reference::ABSOLUTE:
535 validReference =
true;
538 if (!validReference) {
542 *m_readerCloseIndex = absolute;
545 template <
typename T>
550 template <
typename T>
552 return m_bufferLayout->getHeader()->wordSize;
555 template <
typename T>
570 return "(unknown error " + to_string(error) +
")";
578 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_READER_H_ Returned when policy is Policy::BLOCKING and no data becomes available before the specified timeout...
Definition: Reader.h:79
Reader(Policy policy, std::shared_ptr< BufferLayout > bufferLayout, size_t id)
Definition: Reader.h:261
Reference
Specifies a reference to measure seek()/tell()/close() offsets against.
Definition: Reader.h:51
Returned when a read() parameter is invalid.
Definition: Reader.h:81
Returned when policy is Policy::NONBLOCKING and no data is available.
Definition: Reader.h:77
Returned when the data requested has been overwritten and is invalid.
Definition: Reader.h:75
::std::string string
Definition: gtest-port.h:1097
static std::string errorToString(Error error)
Definition: Reader.h:556
std::function< bool(void *buf, size_t nWords)> OutputFunction
Definition: Reader.h:117
size_t getId() const
Definition: Reader.h:546
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
ssize_t read(OutputFunction function, size_t maxWordsCanRead, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))
Definition: Reader.h:311
void close(Index offset=0, Reference reference=Reference::AFTER_READER)
Definition: Reader.h:508
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
ReaderPolicy
Specifies the policy to use for reading from the stream.
Definition: ReaderPolicy.h:25
Index tell(Reference reference=Reference::ABSOLUTE) const
Definition: Reader.h:491
~Reader()
This destructor detaches the Reader from a BufferLayout.
Definition: Reader.h:281
void acsdkError(const LogEntry &entry)
size_t getWordSize() const
Definition: Reader.h:551
bool seek(Index offset, Reference reference=Reference::ABSOLUTE)
Definition: Reader.h:415
LogEntry is used to compile the log entry text to log via Logger.
Definition: LogEntry.h:33