Clean concurrency utilities

This commit is contained in:
Baptiste Wicht 2016-09-17 14:16:56 +02:00
parent a515261ed4
commit 6eb0a44a74
12 changed files with 139 additions and 93 deletions

View File

@ -5,8 +5,8 @@
// http://www.opensource.org/licenses/MIT) // http://www.opensource.org/licenses/MIT)
//======================================================================= //=======================================================================
#ifndef SLEEP_QUEUE_H #ifndef CONDITION_VARIABLE_H
#define SLEEP_QUEUE_H #define CONDITION_VARIABLE_H
#include <circular_buffer.hpp> #include <circular_buffer.hpp>
#include <lock_guard.hpp> #include <lock_guard.hpp>
@ -18,12 +18,7 @@
/*! /*!
* \brief A simple sleep queue * \brief A simple sleep queue
*/ */
struct sleep_queue { struct condition_variable {
private:
mutable spinlock lock;
circular_buffer<scheduler::pid_t, 16> queue;
public:
/*! /*!
* \brief Test if the sleep queue is empty * \brief Test if the sleep queue is empty
*/ */
@ -58,6 +53,10 @@ public:
* \return true if the thread was woken up, false if the timeout is passed * \return true if the thread was woken up, false if the timeout is passed
*/ */
bool sleep(size_t ms); bool sleep(size_t ms);
private:
mutable spinlock lock; ///< The spin lock used for protecting the queue
circular_buffer<scheduler::pid_t, 16> queue; ///< The queue of waiting threads
}; };
#endif #endif

View File

@ -12,32 +12,51 @@
#include "arch.hpp" #include "arch.hpp"
/*!
* \brief An interrupt lock. This lock disable preemption on acquire.
*/
struct int_lock { struct int_lock {
private: /*!
size_t rflags; * \brief Acquire the lock. This will disable preemption.
*/
public:
void acquire(){ void acquire(){
arch::disable_hwint(rflags); arch::disable_hwint(rflags);
} }
/*!
* \brief Release the lock. This will enable preemption.
*/
void release(){ void release(){
arch::enable_hwint(rflags); arch::enable_hwint(rflags);
} }
private:
size_t rflags; ///< The CPU flags
}; };
/*!
* \brief A direct interrupt lock (RAII).
*
* This is the equivalent of a std::lock_guard<int_lock> but does not need to
* store a lock.
*/
struct direct_int_lock { struct direct_int_lock {
private: /*!
int_lock lock; * \brief Construct a new direct_int_lock and acquire the lock.
*/
public:
direct_int_lock(){ direct_int_lock(){
lock.acquire(); lock.acquire();
} }
/*!
* \brief Destruct a direct_int_lock and release the lock.
*/
~direct_int_lock(){ ~direct_int_lock(){
lock.release(); lock.release();
} }
private:
int_lock lock; ///< The interrupt lock
}; };
#endif #endif

View File

@ -16,72 +16,66 @@
#include "scheduler.hpp" #include "scheduler.hpp"
#include "logging.hpp" #include "logging.hpp"
template<bool Debug = false> /*!
* \brief A mutex implementation.
*
* Once the lock is acquired, the critical section is only accessible by the
* thread who acquired the mutex.
*/
struct mutex { struct mutex {
private: /*!
mutable spinlock lock; * \brief Initialize the mutex (either to 1 or 0)
volatile size_t value; * \param v The intial value of the mutex
circular_buffer<scheduler::pid_t, 16> queue; */
const char* name;
public:
void init(size_t v = 1){ void init(size_t v = 1){
value = v; if(v > 1){
value = 1;
if(Debug){ } else {
name = ""; value = v;
} }
} }
void set_name(const char* name){ /*!
this->name = name; * \brief Acquire the lock
} */
void acquire(){ void acquire(){
lock.acquire(); lock.acquire();
if(value > 0){ if(value > 0){
value = 0; value = 0;
if(Debug){
logging::logf(logging::log_level::TRACE, "%s(mutex): directly acquired (process %d)\n", name, scheduler::get_pid());
}
lock.release(); lock.release();
} else { } else {
auto pid = scheduler::get_pid(); auto pid = scheduler::get_pid();
queue.push(pid); queue.push(pid);
if(Debug){
logging::logf(logging::log_level::TRACE, "%s(mutex): wait %d\n", name, pid);
}
scheduler::block_process_light(pid); scheduler::block_process_light(pid);
lock.release(); lock.release();
scheduler::reschedule(); scheduler::reschedule();
} }
} }
/*!
* \brief Acquire the lock
*/
void release(){ void release(){
std::lock_guard<spinlock> l(lock); std::lock_guard<spinlock> l(lock);
if(queue.empty()){ if(queue.empty()){
value = 1; value = 1;
if(Debug){
logging::logf(logging::log_level::TRACE, "%s(mutex): direct release (process %d)\n", name, scheduler::get_pid());
}
} else { } else {
auto pid = queue.pop(); auto pid = queue.pop();
scheduler::unblock_process(pid); scheduler::unblock_process(pid);
if(Debug){
logging::logf(logging::log_level::TRACE, "%s(mutex): wake %d\n", name, pid);
}
//No need to increment value, the process won't //No need to increment value, the process won't
//decrement it //decrement it
} }
} }
private:
mutable spinlock lock; ///< The spin protecting the value
volatile size_t value; ///< The value of the mutex
circular_buffer<scheduler::pid_t, 16> queue; ///< The sleep queue
}; };
#endif #endif

View File

@ -14,17 +14,26 @@
#include "spinlock.hpp" #include "spinlock.hpp"
#include "scheduler.hpp" #include "scheduler.hpp"
/*!
* \brief A semaphore implementation.
*
* The critical section can be open to several processes.
*/
struct semaphore { struct semaphore {
private: /*!
mutable spinlock lock; * \brief Initialize the semaphore
volatile size_t value; * \param v The intial value of the semaphore
circular_buffer<scheduler::pid_t, 16> queue; */
public:
void init(size_t v){ void init(size_t v){
value = v; value = v;
} }
/*!
* \brief Acquire the lock.
*
* This will effectively decrease the current counter by 1 once the critical
* section is entered.
*/
void acquire(){ void acquire(){
lock.acquire(); lock.acquire();
@ -41,6 +50,12 @@ public:
} }
} }
/*!
* \brief Release the lock.
*
* This will effectively increase the current counter by 1 once the critical
* section is left.
*/
void release(){ void release(){
std::lock_guard<spinlock> l(lock); std::lock_guard<spinlock> l(lock);
@ -56,23 +71,34 @@ public:
} }
} }
void release(size_t v){ /*!
* \brief Release the lock several times.
*
* This will effectively increase the current counter by n once the critical
* section is left.
*/
void release(size_t n){
std::lock_guard<spinlock> l(lock); std::lock_guard<spinlock> l(lock);
if(queue.empty()){ if(queue.empty()){
value += v; value += n;
} else { } else {
while(v && !queue.empty()){ while(n && !queue.empty()){
auto pid = queue.pop(); auto pid = queue.pop();
scheduler::unblock_process(pid); scheduler::unblock_process(pid);
--v; --n;
} }
if(v){ if(n){
value += v; value += n;
} }
} }
} }
private:
mutable spinlock lock; ///< The spin lock protecting the counter
volatile size_t value; ///< The value of the counter
circular_buffer<scheduler::pid_t, 16> queue; ///< The sleep queue
}; };
#endif #endif

View File

@ -8,21 +8,33 @@
#ifndef SPINLOCK_H #ifndef SPINLOCK_H
#define SPINLOCK_H #define SPINLOCK_H
/*!
* \brief Implementation of a spinlock
*
* A spinlock simply waits in a loop until the lock is available.
*/
struct spinlock { struct spinlock {
private: /*!
volatile size_t lock = 0; * \brief Acquire the lock.
*
public: * This will wait indefinitely.
*/
void acquire(){ void acquire(){
while(!__sync_bool_compare_and_swap(&lock, 0, 1)); while(!__sync_bool_compare_and_swap(&lock, 0, 1));
__sync_synchronize(); __sync_synchronize();
//TODO The last synchronize is probably not necessary //TODO The last synchronize is probably not necessary
} }
/*!
* \brief Release the lock
*/
void release(){ void release(){
__sync_synchronize(); __sync_synchronize();
lock = 0; lock = 0;
} }
private:
volatile size_t lock = 0; ///< The value of the lock
}; };
#endif #endif

View File

@ -37,7 +37,7 @@ struct interface_descriptor {
network::ip::address ip_address; ///< The interface IP address network::ip::address ip_address; ///< The interface IP address
network::ip::address gateway; ///< The interface IP gateway network::ip::address gateway; ///< The interface IP gateway
mutable mutex<> tx_lock; //To synchronize the queue mutable mutex tx_lock; //To synchronize the queue
mutable semaphore tx_sem; mutable semaphore tx_sem;
mutable semaphore rx_sem; mutable semaphore rx_sem;
@ -50,7 +50,7 @@ struct interface_descriptor {
void (*hw_send)(interface_descriptor&, ethernet::packet& p); void (*hw_send)(interface_descriptor&, ethernet::packet& p);
void send(ethernet::packet& p){ void send(ethernet::packet& p){
std::lock_guard<mutex<>> l(tx_lock); std::lock_guard<mutex> l(tx_lock);
tx_queue.push(p); tx_queue.push(p);
tx_sem.release(); tx_sem.release();
} }

View File

@ -15,7 +15,7 @@
#include "tlib/net_constants.hpp" #include "tlib/net_constants.hpp"
#include "conc/sleep_queue.hpp" #include "conc/condition_variable.hpp"
#include "net/ethernet_packet.hpp" #include "net/ethernet_packet.hpp"
@ -41,7 +41,7 @@ struct socket {
std::vector<network::ethernet::packet> packets; std::vector<network::ethernet::packet> packets;
circular_buffer<network::ethernet::packet, 32> listen_packets; circular_buffer<network::ethernet::packet, 32> listen_packets;
sleep_queue listen_queue; condition_variable listen_queue;
socket(){} socket(){}
socket(size_t id, socket_domain domain, socket_type type, socket_protocol protocol, size_t next_fd, bool listen) socket(size_t id, socket_domain domain, socket_type type, socket_protocol protocol, size_t next_fd, bool listen)

View File

@ -13,7 +13,7 @@
#include <tlib/keycode.hpp> #include <tlib/keycode.hpp>
#include "conc/sleep_queue.hpp" #include "conc/condition_variable.hpp"
namespace stdio { namespace stdio {
@ -35,7 +35,7 @@ struct virtual_terminal {
circular_buffer<char, 2 * INPUT_BUFFER_SIZE> canonical_buffer; circular_buffer<char, 2 * INPUT_BUFFER_SIZE> canonical_buffer;
circular_buffer<size_t, 3 * INPUT_BUFFER_SIZE> raw_buffer; circular_buffer<size_t, 3 * INPUT_BUFFER_SIZE> raw_buffer;
sleep_queue input_queue; condition_variable input_queue;
void print(char c); void print(char c);

View File

@ -5,25 +5,25 @@
// http://www.opensource.org/licenses/MIT) // http://www.opensource.org/licenses/MIT)
//======================================================================= //=======================================================================
#include "conc/sleep_queue.hpp" #include "conc/condition_variable.hpp"
#include "scheduler.hpp" #include "scheduler.hpp"
#include "logging.hpp" #include "logging.hpp"
#include "assert.hpp" #include "assert.hpp"
bool sleep_queue::empty() const { bool condition_variable::empty() const {
std::lock_guard<spinlock> l(lock); std::lock_guard<spinlock> l(lock);
return queue.empty(); return queue.empty();
} }
scheduler::pid_t sleep_queue::top_process() const { scheduler::pid_t condition_variable::top_process() const {
std::lock_guard<spinlock> l(lock); std::lock_guard<spinlock> l(lock);
return queue.top(); return queue.top();
} }
scheduler::pid_t sleep_queue::wake_up() { scheduler::pid_t condition_variable::wake_up() {
std::lock_guard<spinlock> l(lock); std::lock_guard<spinlock> l(lock);
while (!queue.empty()) { while (!queue.empty()) {
@ -34,7 +34,7 @@ scheduler::pid_t sleep_queue::wake_up() {
queue.pop(); queue.pop();
if (pid != scheduler::INVALID_PID) { if (pid != scheduler::INVALID_PID) {
logging::logf(logging::log_level::TRACE, "sleep_queue: wake %d\n", pid); logging::logf(logging::log_level::TRACE, "condition_variable: wake %d\n", pid);
// Indicate to the scheduler that this process will be able to run // Indicate to the scheduler that this process will be able to run
// We use a hint here because it is possible that the thread was // We use a hint here because it is possible that the thread was
@ -48,7 +48,7 @@ scheduler::pid_t sleep_queue::wake_up() {
return scheduler::INVALID_PID; return scheduler::INVALID_PID;
} }
void sleep_queue::wake_up_all() { void condition_variable::wake_up_all() {
std::lock_guard<spinlock> l(lock); std::lock_guard<spinlock> l(lock);
while (!queue.empty()) { while (!queue.empty()) {
@ -59,7 +59,7 @@ void sleep_queue::wake_up_all() {
queue.pop(); queue.pop();
if (pid != scheduler::INVALID_PID) { if (pid != scheduler::INVALID_PID) {
logging::logf(logging::log_level::TRACE, "sleep_queue: wake(all) %d\n", pid); logging::logf(logging::log_level::TRACE, "condition_variable: wake(all) %d\n", pid);
// Indicate to the scheduler that this process will be able to run // Indicate to the scheduler that this process will be able to run
// We use a hint here because it is possible that the thread was // We use a hint here because it is possible that the thread was
@ -69,18 +69,18 @@ void sleep_queue::wake_up_all() {
} }
} }
void sleep_queue::sleep() { void condition_variable::sleep() {
lock.acquire(); lock.acquire();
//Get the current process information //Get the current process information
auto pid = scheduler::get_pid(); auto pid = scheduler::get_pid();
logging::logf(logging::log_level::TRACE, "sleep_queue: wait %d\n", pid); logging::logf(logging::log_level::TRACE, "condition_variable: wait %d\n", pid);
//Enqueue the process in the sleep queue //Enqueue the process in the sleep queue
queue.push(pid); queue.push(pid);
thor_assert(!queue.full(), "The sleep_queue queue is full!"); thor_assert(!queue.full(), "The condition_variable queue is full!");
//This process will sleep //This process will sleep
scheduler::block_process_light(pid); scheduler::block_process_light(pid);
@ -90,7 +90,7 @@ void sleep_queue::sleep() {
scheduler::reschedule(); scheduler::reschedule();
} }
bool sleep_queue::sleep(size_t ms) { bool condition_variable::sleep(size_t ms) {
if (!ms) { if (!ms) {
return false; return false;
} }
@ -100,12 +100,12 @@ bool sleep_queue::sleep(size_t ms) {
//Get the current process information //Get the current process information
auto pid = scheduler::get_pid(); auto pid = scheduler::get_pid();
logging::logf(logging::log_level::TRACE, "sleep_queue: %u wait with timeout %u\n", pid, ms); logging::logf(logging::log_level::TRACE, "condition_variable: %u wait with timeout %u\n", pid, ms);
//Enqueue the process in the sleep queue //Enqueue the process in the sleep queue
queue.push(pid); queue.push(pid);
thor_assert(!queue.full(), "The sleep_queue queue is full!"); thor_assert(!queue.full(), "The condition_variable queue is full!");
//This process will sleep //This process will sleep
scheduler::block_process_timeout_light(pid, ms); scheduler::block_process_timeout_light(pid, ms);

View File

@ -28,10 +28,10 @@ static constexpr const size_t BLOCK_SIZE = 512;
ata::drive_descriptor* drives; ata::drive_descriptor* drives;
mutex<> ata_lock; mutex ata_lock;
mutex<> primary_lock; mutex primary_lock;
mutex<> secondary_lock; mutex secondary_lock;
block_cache cache; block_cache cache;
@ -350,10 +350,6 @@ void ata::detect_disks(){
primary_lock.init(0); primary_lock.init(0);
secondary_lock.init(0); secondary_lock.init(0);
ata_lock.set_name("ata_lock");
primary_lock.set_name("ata_primary_lock");
secondary_lock.set_name("ata_secondary_lock");
// Init the cache with 256 blocks // Init the cache with 256 blocks
cache.init(BLOCK_SIZE, 256); cache.init(BLOCK_SIZE, 256);

View File

@ -14,14 +14,14 @@
#include "net/arp_cache.hpp" #include "net/arp_cache.hpp"
#include "net/ip_layer.hpp" #include "net/ip_layer.hpp"
#include "conc/sleep_queue.hpp" #include "conc/condition_variable.hpp"
#include "logging.hpp" #include "logging.hpp"
#include "kernel_utils.hpp" #include "kernel_utils.hpp"
namespace { namespace {
sleep_queue wait_queue; condition_variable wait_queue;
} //end of anonymous namespace } //end of anonymous namespace

View File

@ -9,7 +9,7 @@
#include <atomic.hpp> #include <atomic.hpp>
#include <list.hpp> #include <list.hpp>
#include "conc/sleep_queue.hpp" #include "conc/condition_variable.hpp"
#include "net/tcp_layer.hpp" #include "net/tcp_layer.hpp"
#include "net/dns_layer.hpp" #include "net/dns_layer.hpp"
@ -41,7 +41,7 @@ struct tcp_connection {
size_t target_port; size_t target_port;
std::atomic<bool> listening; std::atomic<bool> listening;
sleep_queue queue; condition_variable queue;
circular_buffer<network::ethernet::packet, 8> packets; circular_buffer<network::ethernet::packet, 8> packets;
tcp_connection(size_t source_port, size_t target_port) tcp_connection(size_t source_port, size_t target_port)