16 #ifndef ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_WRITER_H_ 17 #define ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_WRITER_H_ 75 Writer(
Policy policy, std::shared_ptr<BufferLayout> bufferLayout);
97 ssize_t write(
const void* buf,
size_t nWords, std::chrono::milliseconds timeout = std::chrono::milliseconds(0));
118 size_t getWordSize()
const;
137 std::shared_ptr<BufferLayout> m_bufferLayout;
147 template <
typename T>
148 const std::string SharedDataStream<T>::Writer::TAG =
"SdsWriter";
150 template <
typename T>
153 m_bufferLayout{bufferLayout},
156 auto header = m_bufferLayout->getHeader();
157 header->isWriterEnabled =
true;
158 header->writeEndCursor = header->writeStartCursor.load();
161 template <
typename T>
166 template <
typename T>
168 if (
nullptr == buf) {
177 auto header = m_bufferLayout->getHeader();
178 if (!header->isWriterEnabled) {
183 auto wordsToCopy = nWords;
184 auto buf8 =
static_cast<const uint8_t*
>(buf);
185 std::unique_lock<Mutex> backwardSeekLock(header->backwardSeekMutex, std::defer_lock);
186 Index writeEnd = header->writeStartCursor + nWords;
191 if (nWords > m_bufferLayout->getDataSize()) {
192 wordsToCopy = nWords = m_bufferLayout->getDataSize();
193 writeEnd = header->writeStartCursor + nWords;
203 backwardSeekLock.lock();
204 if ((writeEnd >= header->oldestUnconsumedCursor) &&
205 ((writeEnd - header->oldestUnconsumedCursor) > m_bufferLayout->getDataSize())) {
213 auto predicate = [
this, header] {
214 return (header->writeStartCursor < header->oldestUnconsumedCursor) ||
215 (header->writeStartCursor - header->oldestUnconsumedCursor) < m_bufferLayout->getDataSize();
220 backwardSeekLock.lock();
223 if (std::chrono::milliseconds::zero() == timeout) {
224 header->spaceAvailableConditionVariable.wait(backwardSeekLock, predicate);
225 }
else if (!header->spaceAvailableConditionVariable.wait_for(backwardSeekLock, timeout, predicate)) {
230 auto spaceAvailable = m_bufferLayout->getDataSize();
231 if (header->writeStartCursor >= header->oldestUnconsumedCursor) {
232 auto wordsToOverrun =
233 m_bufferLayout->getDataSize() - (header->writeStartCursor - header->oldestUnconsumedCursor);
234 if (wordsToOverrun < spaceAvailable) {
235 spaceAvailable = wordsToOverrun;
240 if (spaceAvailable < nWords) {
241 wordsToCopy = nWords = spaceAvailable;
242 writeEnd = header->writeStartCursor + nWords;
248 header->writeEndCursor = writeEnd;
251 if (backwardSeekLock) {
252 backwardSeekLock.unlock();
258 if (wordsToCopy > m_bufferLayout->getDataSize()) {
259 wordsToCopy = m_bufferLayout->getDataSize();
265 size_t beforeWrap = m_bufferLayout->wordsUntilWrap(header->writeStartCursor);
266 if (beforeWrap > wordsToCopy) {
267 beforeWrap = wordsToCopy;
269 size_t afterWrap = wordsToCopy - beforeWrap;
272 memcpy(m_bufferLayout->getData(header->writeStartCursor), buf8, beforeWrap *
getWordSize());
275 m_bufferLayout->getData(header->writeStartCursor + beforeWrap),
286 std::unique_lock<Mutex> dataAvailableLock(header->dataAvailableMutex, std::defer_lock);
288 dataAvailableLock.lock();
290 header->writeStartCursor = header->writeEndCursor.load();
292 dataAvailableLock.unlock();
297 header->dataAvailableConditionVariable.notify_all();
302 template <
typename T>
304 return m_bufferLayout->getHeader()->writeStartCursor;
307 template <
typename T>
309 auto header = m_bufferLayout->getHeader();
310 std::lock_guard<Mutex> lock(header->writerEnableMutex);
314 if (header->isWriterEnabled) {
315 header->isWriterEnabled =
false;
317 std::unique_lock<Mutex> dataAvailableLock(header->dataAvailableMutex);
319 header->hasWriterBeenClosed =
true;
321 header->dataAvailableConditionVariable.notify_all();
326 template <
typename T>
328 return m_bufferLayout->getHeader()->wordSize;
331 template <
typename T>
344 return "(unknown error " + to_string(error) +
")";
352 #endif // ALEXA_CLIENT_SDK_AVSCOMMON_UTILS_INCLUDE_AVSCOMMON_UTILS_SDS_WRITER_H_
ssize_t write(const void *buf, size_t nWords, std::chrono::milliseconds timeout=std::chrono::milliseconds(0))
Definition: Writer.h:167
static std::string errorToString(Error error)
Definition: Writer.h:332
WriterPolicy
Specifies the policy to use for writing to the stream.
Definition: WriterPolicy.h:25
::std::string string
Definition: gtest-port.h:1097
#define TAG
String to identify log entries originating from this file.
Definition: TestableMessageObserver.cpp:27
Index tell() const
Definition: Writer.h:303
Returned when a write() parameter is invalid.
Definition: Writer.h:63
~Writer()
This destructor detaches the Writer from a BufferLayout.
Definition: Writer.h:162
Returned when policy is Policy::BLOCKING and no space becomes available before the specified timeout...
Definition: Writer.h:65
void close()
Definition: Writer.h:308
size_t getWordSize() const
Definition: Writer.h:327
Index
Index used for setting access.
Definition: StateReportGeneratorTest.cpp:41
Whether or not curl logs should be emitted.
Definition: AVSConnectionManager.h:36
void acsdkError(const LogEntry &entry)
Writer(Policy policy, std::shared_ptr< BufferLayout > bufferLayout)
Definition: Writer.h:151
Returned when close() has been previously called on the Writer.
Definition: Writer.h:59
Returned when policy is Policy::ALL_OR_NOTHING and the write() would overwrrite unconsumed data...
Definition: Writer.h:61
LogEntry is used to compile the log entry text to log via Logger.
Definition: LogEntry.h:33