AlexaClientSDK  3.0.0
A cross-platform, modular SDK for interacting with the Alexa Voice Service
BufferLayout.h
Go to the documentation of this file.
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  * http://aws.amazon.com/apache2.0/
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_BUFFERLAYOUT_H_
17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_BUFFERLAYOUT_H_
18 
19 #include <cstdint>
20 #include <cstddef>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
26 #include "SharedDataStream.h"
27 
28 namespace alexaClientSDK {
29 namespace avsCommon {
30 namespace utils {
31 namespace sds {
32 
39 template <typename T>
40 class SharedDataStream<T>::BufferLayout {
41 public:
43  static const uint32_t MAGIC_NUMBER = 0x53445348;
44 
46  static const uint32_t VERSION = 2;
47 
54  BufferLayout(std::shared_ptr<Buffer> buffer);
55 
57  ~BufferLayout();
58 
65  struct Header {
70  uint32_t magic;
71 
76  uint8_t version;
77 
83  uint32_t traitsNameHash;
84 
89  uint16_t wordSize;
90 
96  uint8_t maxReaders;
97 
102 
105 
108 
114 
122 
124  AtomicBool isWriterEnabled;
125 
128 
134 
136  AtomicIndex writeStartCursor;
137 
142  AtomicIndex writeEndCursor;
143 
149 
151  uint32_t referenceCount;
152 
157  Mutex attachMutex;
158 
164  };
165 
171  Header* getHeader() const;
172 
180  AtomicBool* getReaderEnabledArray() const;
181 
189  AtomicIndex* getReaderCursorArray() const;
190 
209  AtomicIndex* getReaderCloseIndexArray() const;
210 
217  Index getDataSize() const;
218 
230  uint8_t* getData(Index at = 0) const;
231 
245  bool init(size_t wordSize, size_t maxReaders, size_t maxEphemeralReaders);
246 
255  bool attach();
256 
261  void detach();
262 
273  bool isReaderEnabled(size_t id) const;
274 
281  void enableReaderLocked(size_t id);
282 
289  void disableReaderLocked(size_t id);
290 
298  Index wordsUntilWrap(Index after) const;
299 
308  static size_t calculateDataOffset(size_t wordSize, size_t maxReaders);
309 
311  void updateOldestUnconsumedCursor();
312 
335  void updateOldestUnconsumedCursorLocked();
336 
337 private:
345  static uint32_t stableHash(const char* string);
346 
354  static size_t alignSizeTo(size_t size, size_t align);
355 
362  static size_t calculateReaderEnabledArrayOffset();
363 
371  static size_t calculateReaderCursorArrayOffset(size_t maxReaders);
372 
380  static size_t calculateReaderCloseIndexArrayOffset(size_t maxReaders);
381 
389  void calculateAndCacheConstants(size_t wordSize, size_t maxReaders);
390 
394  static const std::string TAG;
395 
401  bool isAttached() const;
402 
404  std::shared_ptr<Buffer> m_buffer;
405 
407  AtomicBool* m_readerEnabledArray;
408 
410  AtomicIndex* m_readerCursorArray;
411 
413  AtomicIndex* m_readerCloseIndexArray;
414 
416  Index m_dataSize;
417 
419  uint8_t* m_data;
420 };
421 
422 template <typename T>
423 const std::string SharedDataStream<T>::BufferLayout::TAG = "SdsBufferLayout";
424 
425 template <typename T>
426 SharedDataStream<T>::BufferLayout::BufferLayout(std::shared_ptr<Buffer> buffer) :
427  m_buffer{buffer},
428  m_readerEnabledArray{nullptr},
429  m_readerCursorArray{nullptr},
430  m_readerCloseIndexArray{nullptr},
431  m_dataSize{0},
432  m_data{nullptr} {
433 }
434 
435 template <typename T>
437  detach();
438 }
439 
440 template <typename T>
442  return reinterpret_cast<Header*>(m_buffer->data());
443 }
444 
445 template <typename T>
446 typename SharedDataStream<T>::AtomicBool* SharedDataStream<T>::BufferLayout::getReaderEnabledArray() const {
447  return m_readerEnabledArray;
448 }
449 
450 template <typename T>
451 typename SharedDataStream<T>::AtomicIndex* SharedDataStream<T>::BufferLayout::getReaderCursorArray() const {
452  return m_readerCursorArray;
453 }
454 
455 template <typename T>
456 typename SharedDataStream<T>::AtomicIndex* SharedDataStream<T>::BufferLayout::getReaderCloseIndexArray() const {
457  return m_readerCloseIndexArray;
458 }
459 
460 template <typename T>
462  return m_dataSize;
463 }
464 
465 template <typename T>
467  return m_data + (at % getDataSize()) * getHeader()->wordSize;
468 }
469 
470 template <typename FieldType, typename ClassType>
471 auto inline max_field_limit(FieldType(ClassType::*)) -> decltype(std::numeric_limits<FieldType>::max()) {
472  return std::numeric_limits<FieldType>::max();
473 }
474 
475 template <typename T>
476 bool SharedDataStream<T>::BufferLayout::init(size_t wordSize, size_t maxReaders, size_t maxEphemeralReaders) {
477  // Make sure parameters are not too large to store.
478  if (wordSize > max_field_limit(&Header::wordSize)) {
479  logger::acsdkError(logger::LogEntry(TAG, "initFailed")
480  .d("reason", "wordSizeTooLarge")
481  .d("wordSize", wordSize)
482  .d("wordSizeLimit", max_field_limit(&Header::wordSize)));
483  return false;
484  }
485  if (maxReaders > max_field_limit(&Header::maxReaders)) {
486  logger::acsdkError(logger::LogEntry(TAG, "initFailed")
487  .d("reason", "maxReadersTooLarge")
488  .d("maxReaders", maxReaders)
489  .d("maxReadersLimit", max_field_limit(&Header::maxReaders)));
490  return false;
491  }
492  if (maxEphemeralReaders > max_field_limit(&Header::maxEphemeralReaders)) {
493  logger::acsdkError(logger::LogEntry(TAG, "initFailed")
494  .d("reason", "maxEphermalReadersTooLarge")
495  .d("maxEphemeralReaders", maxEphemeralReaders)
496  .d("maxEphemeralReaders", max_field_limit(&Header::maxEphemeralReaders)));
497  return false;
498  }
499 
500  // Pre-calculate some pointers and sizes that are frequently accessed.
501  calculateAndCacheConstants(wordSize, maxReaders);
502 
503  // Default construction of the Header.
504  auto header = new (getHeader()) Header;
505 
506  // Default construction of the reader arrays.
507  size_t id;
508  for (id = 0; id < maxReaders; ++id) {
509  new (m_readerEnabledArray + id) AtomicBool;
510  new (m_readerCursorArray + id) AtomicIndex;
511  new (m_readerCloseIndexArray + id) AtomicIndex;
512  }
513 
514  // Header field initialization.
515  header->magic = MAGIC_NUMBER;
516  header->version = VERSION;
517  header->traitsNameHash = stableHash(T::traitsName);
518  header->wordSize = static_cast<uint16_t>(wordSize);
519  header->maxReaders = static_cast<uint8_t>(maxReaders);
520  header->maxEphemeralReaders = static_cast<uint8_t>(maxEphemeralReaders);
521  header->isWriterEnabled = false;
522  header->hasWriterBeenClosed = false;
523  header->writeStartCursor = 0;
524  header->writeEndCursor = 0;
525  header->oldestUnconsumedCursor = 0;
526  header->referenceCount = 1;
527 
528  // Reader arrays initialization.
529  for (id = 0; id < maxReaders; ++id) {
530  m_readerEnabledArray[id] = false;
531  m_readerCursorArray[id] = 0;
532  m_readerCloseIndexArray[id] = 0;
533  }
534 
535  return true;
536 }
537 
538 template <typename T>
540  // Verify compatibility.
541  auto header = getHeader();
542  if (header->magic != MAGIC_NUMBER) {
543  logger::acsdkError(logger::LogEntry(TAG, "attachFailed")
544  .d("reason", "magicNumberMismatch")
545  .d("magicNumber", header->magic)
546  .d("expectedMagicNumber", std::to_string(MAGIC_NUMBER)));
547  return false;
548  }
549  if (header->version != VERSION) {
550  logger::acsdkError(logger::LogEntry(TAG, "attachFailed")
551  .d("reason", "incompatibleVersion")
552  .d("version", header->version)
553  .d("expectedVersion", std::to_string(VERSION)));
554  return false;
555  }
556  if (header->traitsNameHash != stableHash(T::traitsName)) {
557  logger::acsdkError(logger::LogEntry(TAG, "attachFailed")
558  .d("reason", "traitsNameHashMismatch")
559  .d("hash", header->traitsNameHash)
560  .d("expectedHash", stableHash(T::traitsName)));
561  return false;
562  }
563 
564  // Attach.
565  std::lock_guard<Mutex> lock(header->attachMutex);
566  if (0 == header->referenceCount) {
567  logger::acsdkError(logger::LogEntry(TAG, "attachFailed").d("reason", "zeroUsers"));
568  return false;
569  }
570  if (std::numeric_limits<decltype(header->referenceCount)>::max() == header->referenceCount) {
571  logger::acsdkError(logger::LogEntry(TAG, "attachFailed")
572  .d("reason", "bufferMaxUsersExceeded")
573  .d("numUsers", header->referenceCount)
574  .d("maxNumUsers", std::numeric_limits<decltype(header->referenceCount)>::max()));
575  return false;
576  }
577  ++header->referenceCount;
578 
579  // Pre-calculate some pointers and sizes that are frequently accessed.
580  calculateAndCacheConstants(header->wordSize, header->maxReaders);
581 
582  return true;
583 }
584 
585 template <typename T>
587  if (!isAttached()) {
588  return;
589  }
590 
591  auto header = getHeader();
592  {
593  std::lock_guard<Mutex> lock(header->attachMutex);
594  --header->referenceCount;
595  if (header->referenceCount > 0) {
596  return;
597  }
598  }
599 
600  // Destruction of reader arrays.
601  for (size_t id = 0; id < header->maxReaders; ++id) {
602  m_readerCloseIndexArray[id].~AtomicIndex();
603  m_readerCursorArray[id].~AtomicIndex();
604  m_readerEnabledArray[id].~AtomicBool();
605  }
606 
607  // Destruction of the Header.
608  header->~Header();
609 }
610 
611 template <typename T>
613  return m_readerEnabledArray[id];
614 }
615 
616 template <typename T>
618  m_readerEnabledArray[id] = true;
619 }
620 
621 template <typename T>
623  m_readerEnabledArray[id] = false;
624 }
625 
626 template <typename T>
628  // The type of Index is uint64_t, size_t is 32 bits in a 32bits system.
629  // Passing an Index value to alignSizeTo may cause integer overflow.
630  return getDataSize() - (after % getDataSize());
631 }
632 
633 template <typename T>
634 size_t SharedDataStream<T>::BufferLayout::calculateDataOffset(size_t wordSize, size_t maxReaders) {
635  return alignSizeTo(calculateReaderCloseIndexArrayOffset(maxReaders) + (maxReaders * sizeof(AtomicIndex)), wordSize);
636 }
637 
638 template <typename T>
640  // Note: as an optimization, we could skip this function if Writer policy is nonblockable (ACSDK-251).
641  std::lock_guard<Mutex> backwardSeekLock(getHeader()->backwardSeekMutex);
643 }
644 
645 template <typename T>
647  auto header = getHeader();
648 
649  // Note: as an optimization, we could skip this function if Writer policy is nonblockable (ACSDK-251).
650 
651  // The only barrier to a blocking writer overrunning a reader is oldestUnconsumedCursor, so we have to be careful
652  // not to ever move it ahead of any readers. The loop below searches through the readers to find the oldest point,
653  // without moving oldestUnconsumedCursor. Note that readers can continue to read while we are looping; it means
654  // oldest may not be completely accurate, but it will always be older than the readers because they are reading
655  // away from it. Also note that backwards seeks (which would break the invariant) are prevented with a mutex which
656  // is held while this function is called. Also note that all read cursors may be in the future, so we start with
657  // an unlimited barrier and work back from there.
658  Index oldest = std::numeric_limits<Index>::max();
659  for (size_t id = 0; id < header->maxReaders; ++id) {
660  // Note that this code is calling isReaderEnabled() without holding readerEnableMutex. On the surface, this
661  // appears to be a race condition because a reader may be disabled and/or re-enabled before the subsequent code
662  // reads the cursor, but it turns out to be safe because:
663  // - if a reader is enabled, its cursor is valid
664  // - if a reader becomes disabled, its cursor moves to writeCursor (which will never be the oldest)
665  // - if a reader becomes re-enabled, its cursor defaults to writeCursor (which will never be the oldest)
666  // - if a reader is created that wants to be at an older index, it gets there by doing a backward seek (which
667  // is locked when this function is called)
668  if (isReaderEnabled(id) && getReaderCursorArray()[id] < oldest) {
669  oldest = getReaderCursorArray()[id];
670  }
671  }
672 
673  // If no barrier was found, block at the write cursor so that we retain data until a reader comes along to read it.
674  if (std::numeric_limits<Index>::max() == oldest) {
675  oldest = header->writeStartCursor;
676  }
677 
678  // Now that we've measured the oldest cursor, we can safely update oldestUnconsumedCursor with no risk of an
679  // overrun of any readers.
680 
681  // To clarify the logic here, the code above reviewed all of the enabled readers to see where the oldest cursor is
682  // at. This value is captured in the 'oldest' variable. Now we want to move up our writer barrier
683  // ('oldestUnconsumedCursor') if it is older than it needs to be.
684  if (oldest > header->oldestUnconsumedCursor) {
685  header->oldestUnconsumedCursor = oldest;
686 
687  // Notify the writer(s).
688  // Note: as an optimization, we could skip this if there are no blocking writers (ACSDK-251).
689  header->spaceAvailableConditionVariable.notify_all();
690  }
691 }
692 
693 template <typename T>
694 uint32_t SharedDataStream<T>::BufferLayout::stableHash(const char* string) {
695  // Simple, stable hash which XORs all bytes of string into the hash value.
696  uint32_t hashed = 0;
697  size_t pos = 0;
698  while (*string) {
699  hashed ^= *string << ((pos % sizeof(uint32_t)) * 8);
700  ++string;
701  ++pos;
702  }
703  return hashed;
704 }
705 
706 template <typename T>
707 size_t SharedDataStream<T>::BufferLayout::alignSizeTo(size_t size, size_t align) {
708  if (size) {
709  return (((size - 1) / align) + 1) * align;
710  } else {
711  return 0;
712  }
713 }
714 
715 template <typename T>
716 size_t SharedDataStream<T>::BufferLayout::calculateReaderEnabledArrayOffset() {
717  return alignSizeTo(sizeof(Header), alignof(AtomicBool));
718 }
719 
720 template <typename T>
721 size_t SharedDataStream<T>::BufferLayout::calculateReaderCursorArrayOffset(size_t maxReaders) {
722  return alignSizeTo(calculateReaderEnabledArrayOffset() + (maxReaders * sizeof(AtomicBool)), alignof(AtomicIndex));
723 }
724 
725 template <typename T>
726 size_t SharedDataStream<T>::BufferLayout::calculateReaderCloseIndexArrayOffset(size_t maxReaders) {
727  return calculateReaderCursorArrayOffset(maxReaders) + (maxReaders * sizeof(AtomicIndex));
728 }
729 
730 template <typename T>
731 void SharedDataStream<T>::BufferLayout::calculateAndCacheConstants(size_t wordSize, size_t maxReaders) {
732  auto buffer = reinterpret_cast<uint8_t*>(m_buffer->data());
733  m_readerEnabledArray = reinterpret_cast<AtomicBool*>(buffer + calculateReaderEnabledArrayOffset());
734  m_readerCursorArray = reinterpret_cast<AtomicIndex*>(buffer + calculateReaderCursorArrayOffset(maxReaders));
735  m_readerCloseIndexArray = reinterpret_cast<AtomicIndex*>(buffer + calculateReaderCloseIndexArrayOffset(maxReaders));
736  m_dataSize = (m_buffer->size() - calculateDataOffset(wordSize, maxReaders)) / wordSize;
737  m_data = buffer + calculateDataOffset(wordSize, maxReaders);
738 }
739 
740 template <typename T>
741 bool SharedDataStream<T>::BufferLayout::isAttached() const {
742  return m_data != nullptr;
743 }
744 
745 } // namespace sds
746 } // namespace utils
747 } // namespace avsCommon
748 } // namespace alexaClientSDK
749 
750 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_BUFFERLAYOUT_H_
AtomicBool isWriterEnabled
This field indicates whether there is an enabled (not closed) Writer.
Definition: BufferLayout.h:124
void enableReaderLocked(size_t id)
Definition: BufferLayout.h:617
void disableReaderLocked(size_t id)
Definition: BufferLayout.h:622
bool isReaderEnabled(size_t id) const
Definition: BufferLayout.h:612
::std::string string
Definition: gtest-port.h:1097
AtomicBool * getReaderEnabledArray() const
Definition: BufferLayout.h:446
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
static const uint32_t VERSION
Version of this header layout.
Definition: BufferLayout.h:46
AtomicIndex writeStartCursor
This field contains the next location to write to.
Definition: BufferLayout.h:136
static size_t calculateDataOffset(size_t wordSize, size_t maxReaders)
Definition: BufferLayout.h:634
Header * getHeader() const
Definition: BufferLayout.h:441
ConditionVariable dataAvailableConditionVariable
This field contains the condition variable used to notify Readers that data is available.
Definition: BufferLayout.h:104
static const uint32_t MAGIC_NUMBER
Magic number used to identify a valid Header in memory.
Definition: BufferLayout.h:43
uint8_t * getData(Index at=0) const
Definition: BufferLayout.h:466
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
bool init(size_t wordSize, size_t maxReaders, size_t maxEphemeralReaders)
Definition: BufferLayout.h:476
void updateOldestUnconsumedCursor()
This function calls updateOldestUnconsumedCursorLocked() while holding Header::backwardSeekMutex.
Definition: BufferLayout.h:639
void acsdkError(const LogEntry &entry)
AtomicIndex * getReaderCursorArray() const
Definition: BufferLayout.h:451
LogEntry & d(const std::string &key, const ValueType &value)
Definition: LogEntry.h:231
Index wordsUntilWrap(Index after) const
Definition: BufferLayout.h:627
auto max_field_limit(FieldType(ClassType::*)) -> decltype(std::numeric_limits< FieldType >::max())
Definition: BufferLayout.h:471
Index getDataSize() const
Definition: BufferLayout.h:461
ConditionVariable spaceAvailableConditionVariable
Definition: BufferLayout.h:113
BufferLayout(std::shared_ptr< Buffer > buffer)
Definition: BufferLayout.h:426
AtomicBool hasWriterBeenClosed
This field indicates that a Writer had at one point been enabled and then closed. ...
Definition: BufferLayout.h:127
void updateOldestUnconsumedCursorLocked()
Definition: BufferLayout.h:646
uint32_t referenceCount
This field tracks the number of BufferLayout instances currently attached to a Buffer.
Definition: BufferLayout.h:151
Mutex dataAvailableMutex
This field contains the mutex used by dataAvailableConditionVariable.
Definition: BufferLayout.h:107
AtomicIndex * getReaderCloseIndexArray() const
Definition: BufferLayout.h:456
~BufferLayout()
The destructor ensures the BufferLayout is detach()es from the Buffer.
Definition: BufferLayout.h:436
LogEntry is used to compile the log entry text to log via Logger.
Definition: LogEntry.h:33

AlexaClientSDK 3.0.0 - Copyright 2016-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0