00001
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #ifndef SCHEDULER_HPP
00044 #define SCHEDULER_HPP 1
00045
00046
00047 #include <pthread.h>
00048 #include <iostream>
00049 #include <vector>
00050 #include <deque>
00051
00052
00053
00054
00055
00056
00083 template <class Solv, class Prob, class Err>
00084 class Scheduler {
00085
00086 class Consumer {
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 pthread_t _thread;
00098 Solv *_solver;
00099 Scheduler *_scheduler;
00100
00101
00102
00103 void *consumer_main( void ) {
00104 Prob *p;
00105
00106
00107
00108
00109
00110
00111 while( (p = _scheduler->get_next_problem()) ) {
00112 try {
00113
00114
00115 (*_solver)( p, *_scheduler );
00116
00117
00118 } catch( Err e ) {
00119
00120
00121 _scheduler->on_error( e, p );
00122 break;
00123 };
00124 _scheduler->put_solved_problem( p );
00125 }
00126
00127
00128
00129
00130
00131 return( NULL );
00132 }
00133
00134 public:
00135
00136 static void *consumer_entry( void *data ) {
00137 Consumer *consumer = (Consumer *)data;
00138 return( consumer->consumer_main() );
00139 }
00140
00141 Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) {
00142
00143
00144
00145
00146 }
00147
00148 ~Consumer() {
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159 }
00160
00161 void run( void ) {
00162 pthread_create( &_thread, NULL, consumer_entry, (void *)this );
00163 }
00164
00165 void join( void ) {
00166
00167
00168
00169
00170
00171
00172
00173
00174 pthread_join( _thread, NULL );
00175 }
00176
00177 };
00178
00179
00180 pthread_mutex_t _mutex;
00181 pthread_cond_t _scheduler_cond;
00182 pthread_cond_t _producer_cond;
00183 pthread_cond_t _consumer_cond;
00184
00185 size_t _problems_in_c;
00186 size_t _problems_out_c;
00187 size_t _problems_err_c;
00188 std::deque<Prob*> _problems_in;
00189 std::deque<Prob*> _problems_out;
00190
00191 pthread_t _scheduler_thread;
00192 std::vector<Consumer *> _consumers;
00193
00194 bool _running;
00195 bool _error;
00196 bool _done;
00197 bool _finish;
00198 std::vector<Err> _err;
00199 std::vector<Prob *> _prob;
00200
00201
00207 void on_error( Err &e, Prob *p ) {
00208 pthread_mutex_lock( &_mutex );
00209 _err.push_back( e );
00210 _prob.push_back( p );
00211 _problems_err_c++;
00212 _error = true;
00213 pthread_cond_broadcast( &_scheduler_cond );
00214 pthread_mutex_unlock( &_mutex );
00215 }
00216
00217
00218 Prob *get_next_problem( void ) {
00219 Prob *ret;
00220 pthread_mutex_lock( &_mutex );
00221
00222 if( _done || _error ) {
00223 pthread_mutex_unlock( &_mutex );
00224 return( NULL );
00225 }
00226
00227 if( _problems_in.empty() ) {
00228
00229 pthread_cond_signal( &_scheduler_cond );
00230 while( _problems_in.empty() ) {
00231
00232 pthread_cond_wait( &_consumer_cond, &_mutex );
00233 if( _done || _error ) {
00234 pthread_mutex_unlock( &_mutex );
00235 return( NULL );
00236 }
00237 }
00238 }
00239
00240
00241 ret = _problems_in.front();
00242 _problems_in.pop_front();
00243 pthread_mutex_unlock( &_mutex );
00244 return( ret );
00245 }
00246
00247
00248 void put_solved_problem( Prob *p ) {
00249 pthread_mutex_lock( &_mutex );
00250 _problems_out_c++;
00251
00252 _problems_out.push_back( p );
00253 pthread_mutex_unlock( &_mutex );
00254 }
00255
00256
00257 void *scheduler_main( void ) {
00258
00259
00260 for( size_t a = 0; a < _consumers.size(); a++ )
00261 _consumers[a]->run();
00262
00263 pthread_mutex_lock( &_mutex );
00264
00265 while( 1 ) {
00266
00267 while( !(_problems_in.empty() || _done || _error) ) {
00268
00269 pthread_cond_wait( &_scheduler_cond, &_mutex );
00270 }
00271
00272 if( (_finish && _problems_in_c == _problems_out_c+_problems_err_c) ||
00273 _done || _error )
00274 break;
00275
00276
00277 pthread_cond_wait( &_scheduler_cond, &_mutex );
00278
00279
00280
00281
00282
00283 pthread_cond_broadcast( &_consumer_cond );
00284 }
00285
00286
00287 _done = true;
00288 pthread_cond_broadcast( &_consumer_cond );
00289 pthread_mutex_unlock( &_mutex );
00290
00291
00292
00293 for( size_t a = 0; a < _consumers.size(); a++ )
00294 _consumers[a]->join();
00295
00296 pthread_cond_broadcast( &_producer_cond );
00297
00298 _running = false;
00299 return( NULL );
00300 }
00301
00302
00303
00304
00305 static void *scheduler_entry( void *data ) {
00306 Scheduler *scheduler = (Scheduler *)data;
00307 return( scheduler->scheduler_main() );
00308 }
00309
00310
00311 public:
00312
00313
00319 Scheduler( std::vector<Solv *> s )
00320 : _problems_in_c(0), _problems_out_c(0), _problems_err_c(0), _running(false) {
00321
00322 pthread_mutex_init( &_mutex, NULL );
00323 pthread_cond_init( &_scheduler_cond, NULL );
00324 pthread_cond_init( &_consumer_cond, NULL );
00325 pthread_cond_init( &_producer_cond, NULL );
00326
00327
00328 for( size_t a = 0; a < s.size(); a++ )
00329 _consumers.push_back( new Consumer( s[a], this ) );
00330 }
00331
00332
00335 ~Scheduler() {
00336 finish();
00337 pthread_join( _scheduler_thread, NULL );
00338
00339 pthread_mutex_destroy( &_mutex );
00340 pthread_cond_destroy( &_scheduler_cond );
00341 pthread_cond_destroy( &_consumer_cond );
00342 pthread_cond_destroy( &_producer_cond );
00343
00344
00345 for( size_t a = 0; a < _consumers.size(); a++ )
00346 delete _consumers[a];
00347 }
00348
00349
00355 template <class Cont>
00356 size_t get_solved_problems( Cont &c ) {
00357 pthread_mutex_lock( &_mutex );
00358 size_t r = _problems_out.size();
00359 while( !_problems_out.empty() ) {
00360 c.push_back( _problems_out.front() );
00361 _problems_out.pop_front();
00362 }
00363 pthread_mutex_unlock( &_mutex );
00364 return( r );
00365 }
00366
00367
00370 bool is_error( void ) {
00371
00372 return( _error );
00373 }
00374
00375
00378 bool is_running( void ) {
00379
00380 return( _running );
00381 }
00382
00383
00390 template <class Cont1, class Cont2>
00391 size_t get_errors( Cont1 &e, Cont2 &p ) {
00392 pthread_mutex_lock( &_mutex );
00393 size_t r = _err.size();
00394 for( size_t a = 0; a < _err.size(); a++ ) {
00395 e.push_back( _err[a] );
00396 p.push_back( _prob[a] );
00397 }
00398 _err.clear();
00399 _prob.clear();
00400 pthread_mutex_unlock( &_mutex );
00401 return( r );
00402 }
00403
00410 void run( void ) {
00411
00412 if( _running )
00413 return;
00414 _running = true;
00415 _error = false;
00416 _done = false;
00417 _finish = false;
00418 _err.clear();
00419 _prob.clear();
00420 pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
00421 }
00422
00423
00426 void add_problem( Prob *p ) {
00427
00428 pthread_mutex_lock( &_mutex );
00429 _problems_in_c++;
00430 _problems_in.push_back( p );
00431 pthread_cond_broadcast( &_scheduler_cond );
00432 pthread_mutex_unlock( &_mutex );
00433 }
00434
00435
00438 void add_problems( std::vector<Prob *> p ) {
00439
00440 pthread_mutex_lock( &_mutex );
00441 _problems_in_c += p.size();
00442 _problems_in.insert( _problems_in.end(), p.begin(), p.end() );
00443 pthread_cond_broadcast( &_scheduler_cond );
00444 pthread_mutex_unlock( &_mutex );
00445 }
00446
00447
00453 bool finish( void ) {
00454 if( _finish )
00455 return( true );
00456 if( !_running )
00457 return( false );
00458
00459 pthread_mutex_lock( &_mutex );
00460 _finish = true;
00461
00462 pthread_cond_broadcast( &_scheduler_cond );
00463
00464
00465 pthread_cond_wait( &_producer_cond, &_mutex );
00466 pthread_mutex_unlock( &_mutex );
00467
00468 if( _error )
00469 return( false );
00470 return( true );
00471 }
00472
00473
00474 friend class Consumer;
00475 };
00476
00477
00478
00479 #endif
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498