16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_BUFFERLAYOUT_H_ 17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_BUFFERLAYOUT_H_ 43 static const uint32_t MAGIC_NUMBER = 0x53445348;
46 static const uint32_t VERSION = 2;
171 Header* getHeader()
const;
180 AtomicBool* getReaderEnabledArray()
const;
189 AtomicIndex* getReaderCursorArray()
const;
209 AtomicIndex* getReaderCloseIndexArray()
const;
217 Index getDataSize()
const;
230 uint8_t* getData(
Index at = 0)
const;
245 bool init(
size_t wordSize,
size_t maxReaders,
size_t maxEphemeralReaders);
273 bool isReaderEnabled(
size_t id)
const;
281 void enableReaderLocked(
size_t id);
289 void disableReaderLocked(
size_t id);
308 static size_t calculateDataOffset(
size_t wordSize,
size_t maxReaders);
311 void updateOldestUnconsumedCursor();
335 void updateOldestUnconsumedCursorLocked();
345 static uint32_t stableHash(
const char*
string);
354 static size_t alignSizeTo(
size_t size,
size_t align);
362 static size_t calculateReaderEnabledArrayOffset();
371 static size_t calculateReaderCursorArrayOffset(
size_t maxReaders);
380 static size_t calculateReaderCloseIndexArrayOffset(
size_t maxReaders);
389 void calculateAndCacheConstants(
size_t wordSize,
size_t maxReaders);
401 bool isAttached()
const;
404 std::shared_ptr<Buffer> m_buffer;
407 AtomicBool* m_readerEnabledArray;
410 AtomicIndex* m_readerCursorArray;
413 AtomicIndex* m_readerCloseIndexArray;
422 template <
typename T>
423 const std::string SharedDataStream<T>::BufferLayout::TAG =
"SdsBufferLayout";
425 template <
typename T>
428 m_readerEnabledArray{
nullptr},
429 m_readerCursorArray{
nullptr},
430 m_readerCloseIndexArray{
nullptr},
435 template <
typename T>
440 template <
typename T>
442 return reinterpret_cast<Header*
>(m_buffer->data());
445 template <
typename T>
447 return m_readerEnabledArray;
450 template <
typename T>
452 return m_readerCursorArray;
455 template <
typename T>
457 return m_readerCloseIndexArray;
460 template <
typename T>
465 template <
typename T>
470 template <
typename FieldType,
typename ClassType>
471 auto inline max_field_limit(FieldType(ClassType::*)) -> decltype(std::numeric_limits<FieldType>::max()) {
472 return std::numeric_limits<FieldType>::max();
475 template <
typename T>
480 .d(
"reason",
"wordSizeTooLarge")
481 .d(
"wordSize", wordSize)
487 .d(
"reason",
"maxReadersTooLarge")
488 .d(
"maxReaders", maxReaders)
494 .d(
"reason",
"maxEphermalReadersTooLarge")
495 .d(
"maxEphemeralReaders", maxEphemeralReaders)
501 calculateAndCacheConstants(wordSize, maxReaders);
508 for (
id = 0;
id < maxReaders; ++id) {
509 new (m_readerEnabledArray + id) AtomicBool;
510 new (m_readerCursorArray + id) AtomicIndex;
511 new (m_readerCloseIndexArray + id) AtomicIndex;
517 header->traitsNameHash = stableHash(T::traitsName);
518 header->wordSize =
static_cast<uint16_t
>(wordSize);
519 header->maxReaders =
static_cast<uint8_t
>(maxReaders);
520 header->maxEphemeralReaders =
static_cast<uint8_t
>(maxEphemeralReaders);
521 header->isWriterEnabled =
false;
522 header->hasWriterBeenClosed =
false;
523 header->writeStartCursor = 0;
524 header->writeEndCursor = 0;
525 header->oldestUnconsumedCursor = 0;
526 header->referenceCount = 1;
529 for (
id = 0;
id < maxReaders; ++id) {
530 m_readerEnabledArray[id] =
false;
531 m_readerCursorArray[id] = 0;
532 m_readerCloseIndexArray[id] = 0;
538 template <
typename T>
544 .d(
"reason",
"magicNumberMismatch")
545 .d(
"magicNumber", header->magic)
549 if (header->version !=
VERSION) {
551 .d(
"reason",
"incompatibleVersion")
552 .d(
"version", header->version)
553 .
d(
"expectedVersion", std::to_string(
VERSION)));
556 if (header->traitsNameHash != stableHash(T::traitsName)) {
558 .d(
"reason",
"traitsNameHashMismatch")
559 .d(
"hash", header->traitsNameHash)
560 .
d(
"expectedHash", stableHash(T::traitsName)));
565 std::lock_guard<Mutex> lock(header->attachMutex);
566 if (0 == header->referenceCount) {
570 if (std::numeric_limits<decltype(header->referenceCount)>::max() == header->referenceCount) {
572 .d(
"reason",
"bufferMaxUsersExceeded")
573 .d(
"numUsers", header->referenceCount)
574 .
d(
"maxNumUsers", std::numeric_limits<decltype(header->referenceCount)>::max()));
577 ++header->referenceCount;
580 calculateAndCacheConstants(header->wordSize, header->maxReaders);
585 template <
typename T>
593 std::lock_guard<Mutex> lock(header->attachMutex);
594 --header->referenceCount;
595 if (header->referenceCount > 0) {
601 for (
size_t id = 0;
id < header->maxReaders; ++id) {
602 m_readerCloseIndexArray[id].~AtomicIndex();
603 m_readerCursorArray[id].~AtomicIndex();
604 m_readerEnabledArray[id].~AtomicBool();
611 template <
typename T>
613 return m_readerEnabledArray[id];
616 template <
typename T>
618 m_readerEnabledArray[id] =
true;
621 template <
typename T>
623 m_readerEnabledArray[id] =
false;
626 template <
typename T>
633 template <
typename T>
635 return alignSizeTo(calculateReaderCloseIndexArrayOffset(maxReaders) + (maxReaders *
sizeof(AtomicIndex)), wordSize);
638 template <
typename T>
641 std::lock_guard<Mutex> backwardSeekLock(
getHeader()->backwardSeekMutex);
645 template <
typename T>
658 Index oldest = std::numeric_limits<Index>::max();
659 for (
size_t id = 0;
id < header->maxReaders; ++id) {
674 if (std::numeric_limits<Index>::max() == oldest) {
675 oldest = header->writeStartCursor;
684 if (oldest > header->oldestUnconsumedCursor) {
685 header->oldestUnconsumedCursor = oldest;
689 header->spaceAvailableConditionVariable.notify_all();
693 template <
typename T>
694 uint32_t SharedDataStream<T>::BufferLayout::stableHash(
const char*
string) {
699 hashed ^= *
string << ((pos %
sizeof(uint32_t)) * 8);
706 template <
typename T>
707 size_t SharedDataStream<T>::BufferLayout::alignSizeTo(
size_t size,
size_t align) {
709 return (((size - 1) / align) + 1) * align;
715 template <
typename T>
716 size_t SharedDataStream<T>::BufferLayout::calculateReaderEnabledArrayOffset() {
717 return alignSizeTo(
sizeof(
Header),
alignof(AtomicBool));
720 template <
typename T>
721 size_t SharedDataStream<T>::BufferLayout::calculateReaderCursorArrayOffset(
size_t maxReaders) {
722 return alignSizeTo(calculateReaderEnabledArrayOffset() + (maxReaders *
sizeof(AtomicBool)),
alignof(AtomicIndex));
725 template <
typename T>
726 size_t SharedDataStream<T>::BufferLayout::calculateReaderCloseIndexArrayOffset(
size_t maxReaders) {
727 return calculateReaderCursorArrayOffset(maxReaders) + (maxReaders *
sizeof(AtomicIndex));
730 template <
typename T>
731 void SharedDataStream<T>::BufferLayout::calculateAndCacheConstants(
size_t wordSize,
size_t maxReaders) {
732 auto buffer =
reinterpret_cast<uint8_t*
>(m_buffer->data());
733 m_readerEnabledArray =
reinterpret_cast<AtomicBool*
>(buffer + calculateReaderEnabledArrayOffset());
734 m_readerCursorArray =
reinterpret_cast<AtomicIndex*
>(buffer + calculateReaderCursorArrayOffset(maxReaders));
735 m_readerCloseIndexArray =
reinterpret_cast<AtomicIndex*
>(buffer + calculateReaderCloseIndexArrayOffset(maxReaders));
740 template <
typename T>
741 bool SharedDataStream<T>::BufferLayout::isAttached()
const {
742 return m_data !=
nullptr;
750 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_BUFFERLAYOUT_H_
void enableReaderLocked(size_t id)
Definition: BufferLayout.h:617
Definition: BufferLayout.h:40
void disableReaderLocked(size_t id)
Definition: BufferLayout.h:622
bool isReaderEnabled(size_t id) const
Definition: BufferLayout.h:612
::std::string string
Definition: gtest-port.h:1097
AtomicBool * getReaderEnabledArray() const
Definition: BufferLayout.h:446
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
static const uint32_t VERSION
Version of this header layout.
Definition: BufferLayout.h:46
static size_t calculateDataOffset(size_t wordSize, size_t maxReaders)
Definition: BufferLayout.h:634
Header * getHeader() const
Definition: BufferLayout.h:441
void detach()
Definition: BufferLayout.h:586
static const uint32_t MAGIC_NUMBER
Magic number used to identify a valid Header in memory.
Definition: BufferLayout.h:43
uint8_t * getData(Index at=0) const
Definition: BufferLayout.h:466
bool attach()
Definition: BufferLayout.h:539
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
bool init(size_t wordSize, size_t maxReaders, size_t maxEphemeralReaders)
Definition: BufferLayout.h:476
void updateOldestUnconsumedCursor()
This function calls updateOldestUnconsumedCursorLocked() while holding Header::backwardSeekMutex.
Definition: BufferLayout.h:639
void acsdkError(const LogEntry &entry)
AtomicIndex * getReaderCursorArray() const
Definition: BufferLayout.h:451
LogEntry & d(const std::string &key, const ValueType &value)
Definition: LogEntry.h:231
Index wordsUntilWrap(Index after) const
Definition: BufferLayout.h:627
auto max_field_limit(FieldType(ClassType::*)) -> decltype(std::numeric_limits< FieldType >::max())
Definition: BufferLayout.h:471
Index getDataSize() const
Definition: BufferLayout.h:461
BufferLayout(std::shared_ptr< Buffer > buffer)
Definition: BufferLayout.h:426
void updateOldestUnconsumedCursorLocked()
Definition: BufferLayout.h:646
AtomicIndex * getReaderCloseIndexArray() const
Definition: BufferLayout.h:456
~BufferLayout()
The destructor ensures the BufferLayout is detach()es from the Buffer.
Definition: BufferLayout.h:436
LogEntry is used to compile the log entry text to log via Logger.
Definition: LogEntry.h:33