SourceXtractorPlusPlus  0.16
Please provide a description of the project.
Prefetcher.cpp
Go to the documentation of this file.
1 
18 #include <ElementsKernel/Logging.h>
21 
23 
24 
25 namespace SourceXtractor {
26 
30 template<typename Lock>
31 struct ReverseLock {
32  explicit ReverseLock(Lock& lock) : m_lock(lock) {
33  m_lock.unlock();
34  }
35 
37  m_lock.lock();
38  }
39 
40 private:
41  Lock& m_lock;
42 };
43 
44 Prefetcher::Prefetcher(const std::shared_ptr<Euclid::ThreadPool>& thread_pool, unsigned max_queue_size)
45  : m_thread_pool(thread_pool), m_stop(false), m_semaphore(max_queue_size) {
46  m_output_thread = Euclid::make_unique<std::thread>(&Prefetcher::outputLoop, this);
47 }
48 
51  wait();
52 }
53 
56 
57  intptr_t source_addr = reinterpret_cast<intptr_t>(message.get());
58  {
60  m_received.emplace_back(EventType::SOURCE, source_addr);
61  }
62 
63  // Pre-fetch in separate threads
64  m_thread_pool->submit([this, source_addr, message]() {
65  for (auto& prop : m_prefetch_set) {
66  message->getProperty(prop);
67  }
68  {
70  m_finished_sources.emplace(source_addr, message);
71  }
73  });
74 }
75 
76 void Prefetcher::requestProperty(const PropertyId& property_id) {
77  m_prefetch_set.emplace(property_id);
78  logger.debug() << "Requesting prefetch of " << property_id.getString();
79 }
80 
82  logger.debug() << "Starting prefetcher output loop";
83 
84  while (m_thread_pool->activeThreads() > 0) {
86 
87  // Wait for something new
89 
90  // Process the output queue
91  // This is, release sources when the front of the received has been processed
92  while (!m_received.empty()) {
93  auto next = m_received.front();
94  // If the front is a ProcessSourceEvent, everything received before is done,
95  // so pass downstream
96  if (next.m_event_type == EventType::PROCESS_SOURCE) {
97  auto event = m_event_queue.front();
98  m_event_queue.pop_front();
99  logger.debug() << "ProcessSourceEvent released";
100  {
101  ReverseLock<decltype(output_lock)> release_lock(output_lock);
103  }
104  m_received.pop_front();
105  continue;
106  }
107  // Find if the matching source is done
108  auto processed = m_finished_sources.find(next.m_source_addr);
109  // If not, we can't keep going, so exit here
110  if (processed == m_finished_sources.end()) {
111  logger.debug() << "Next source " << next.m_source_addr << " not done yet";
112  break;
113  }
114  // If it is, send it downstream
115  logger.debug() << "Source " << next.m_source_addr << " sent downstream";
116  {
117  ReverseLock<decltype(output_lock)> release_lock(output_lock);
119  }
120  m_finished_sources.erase(processed);
121  m_received.pop_front();
123  }
124 
125  if (m_stop && m_received.empty()) {
126  break;
127  }
128  }
129  logger.debug() << "Stopping prefetcher output loop";
130 }
131 
133  {
136  m_event_queue.emplace_back(message);
137  }
139  logger.debug() << "ProcessSourceEvent received";
140 }
141 
143  m_stop = true;
145 }
146 
147 } // end of namespace SourceXtractor
static Elements::Logging logger
Definition: Prefetcher.cpp:22
static Logging getLogger(const std::string &name="")
void submit(Task task)
size_t activeThreads() const
Implements the Observer pattern. Notifications will be made using a message of type T.
Definition: Observable.h:51
void notifyObservers(const T &message) const
Definition: Observable.h:71
void requestProperty(const PropertyId &property_id)
Definition: Prefetcher.cpp:76
std::deque< ProcessSourcesEvent > m_event_queue
Queue of received ProcessSourceEvent, order preserved.
Definition: Prefetcher.h:116
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Pointer to the pool of worker threads.
Definition: Prefetcher.h:106
Prefetcher(const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition: Prefetcher.cpp:44
std::condition_variable m_new_output
Notifies there is a new source done processing.
Definition: Prefetcher.h:112
std::deque< EventType > m_received
Queue of type of received events. Used to pass downstream events respecting the received order.
Definition: Prefetcher.h:118
void handleMessage(const std::shared_ptr< SourceInterface > &message) override
Definition: Prefetcher.cpp:54
Euclid::Semaphore m_semaphore
Keep the queue under control.
Definition: Prefetcher.h:126
std::set< PropertyId > m_prefetch_set
Properties to prefetch.
Definition: Prefetcher.h:108
std::unique_ptr< std::thread > m_output_thread
Orchestration thread.
Definition: Prefetcher.h:110
std::atomic_bool m_stop
Termination condition for the output loop.
Definition: Prefetcher.h:123
std::map< intptr_t, std::shared_ptr< SourceInterface > > m_finished_sources
Finished sources.
Definition: Prefetcher.h:114
Identifier used to set and retrieve properties.
Definition: PropertyId.h:40
std::string getString() const
Definition: PropertyId.cpp:36
T get(T... args)
T join(T... args)
T joinable(T... args)
T lock(T... args)
static auto logger
Definition: WCS.cpp:44
T next(T... args)
Event received by SourceGrouping to request the processing of some of the Sources stored.