thread pool и борьба с race condition
От: RedApe Беларусь  
Дата: 01.02.15 07:16
Оценка:
Есть пул потоков thread_pool и есть классы задач some_task, которые проводят обработку запуская работу в потоках.
Типа так, очень упрощенно:

// структура, которая хранит данные для параллельной обработки
struct process  
{
    task* self;
    some_data config;
    some_data1 source;
    some_data2 result;
};

// открытый наружу класс, при помощи которого конфигурируется задача
// запускается и получается результат
class some_task
{
public:
    ~some_task()
    {
        thread_pool::detach_task(this);
    }

    void run()
    {
        some_data cfg;

        // указатель на some_task при работе потока не 
        // используется, и нужен только чтобы идентифицировать задание
        process p {this, cfg, ...}
        thread_pool::start( p );
    }

    some_result get_result()
    {
        return thread_pool::wait_and_take_result(this);
    }
}

// хранит список задач и запускает потоки
class thread_pool; 

void thread_pool::detach_task(task* self)
{
    boost::mutex::scoped_lock lock(mutex_); 
    tasks_.remove(self);
    ...
}

some_result thread_pool::wait_and_take_result(task* self)
{
    boost::mutex::scoped_lock lock(mutex_); 

    some_result* r = 0;
    while(1) 
    {
        r = result_ready(self);
        if (!r) some_pool_has_finished.wait(lock);
        else break;
    }

    return *r;    
}

int main()
{
   shared_ptr<task> t( new task );
   t->run();
   some_result = t->get_result();
}


Я стремился сделать так, чтобы параллельные потоки были отвязаны от класса task, т.е. чтобы при нескольких запусках task::run, выполнение не ломалось, а просто результат устаревал. А через get_result я получал бы самый актуальный результат. Проблема теперь в том, что я хочу пробрасывать сообщения из работчего потока в основной, например для обновления окна, типа так

thread_pool::start( ... )
{
    // основную работу делаем без блокировки мьютекса
    ...

    // а когда задание выполнено:

    boost::mutex::scoped_lock lock(mutex_); 
    // убеждаемся что задание не удалено
    if (tasks_.contains(self))
    {
#if _PREFER_DEADOCK

        // если делать так, то любой вызов метода thread_pool
        // приведет к дедлоку, например я не могу сделать
        // в обработчике delete this
        self->on_process_finished();

#else // _PREFER_RACE_CONDITION

        // а если так, то имеет место race_condition
        lock.unlock();

        // возможно в этот момент задание и будет удалено
        self->on_process_finished(); 
#endif
    }
}


В общем существующий код значительно сложнее и худо-бедно работает, но меня не покидает ощущение, мой код не корректен. Я пытаюсь его улучшить и нахожу такие вот концептуальные косяки. Как вообще такие вещи делаются? Может быть можно где-то архитектуру подсмотреть или почитать о примерах организации такой вот параллельной работы?
--
RedApe
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.