Parallel Programming - Threads
Parallel Programming
Section titled “Parallel Programming”Threads
Section titled “Threads”Thread
Section titled “Thread”A Thread is an autonomous unit within a process that can execute tasks.
Conceptually, threads allow a concurrent execution of computational steps within one process.
Process vs. Thread
Section titled “Process vs. Thread”Process vs. Thread (2)
Section titled “Process vs. Thread (2)”Process | Thread |
---|---|
Processes are heavyweight operations. | Threads are lighter weight operations. |
Each process has its own memory space. | Threads use the memory of the process they belong to. |
Inter-process communication is slow as processes have different memory addresses. | Inter-thread communication can be faster than inter-process communication because threads of the same process share memory with the process they belong to. |
Context switching between processes is more expensive. | Context switching between threads of the same process is less expensive. |
Processes don’t share memory with other processes. | Threads share memory with other threads of the same process. |
Resource Sharing
Section titled “Resource Sharing”Threads share resources within one process.
Programming Threads in C#
Section titled “Programming Threads in C#”We use objects of type Thread to do concurrent programming within a process.
The number of threads a process can handle is limited (yet quite high).
A first program
Section titled “A first program”class Threads{ static void Main(string[] args) { var thread = new Thread(Worker); thread.Start();
Console.WriteLine("Program started"); }
static void Worker() { while (true) { Thread.Sleep(1000); // wait for one second Console.WriteLine("hello from worker"); } }}
Output:Program startedhello from workerhello from worker...
Concurrent execution
Section titled “Concurrent execution”When starting a thread using Thread.Start()
, the thread will start using the specified method.
The program flow in the main thread will continue though.
This is why in the previous example we first see the output "Program started"
before the other thread outputs its first message.
Concurrent? I thought you said parallel?
Section titled “Concurrent? I thought you said parallel?”Why are we saying that threads are executed concurrently when what we really want to do is executing them in parallel?
The CLR manages the execution of threads. It will try to use the available CPU cores optimally to split up your work.
So with threads we are guaranteed a concurrent execution at least (multiple threads on one CPU core), but if possible, the threads will be split up upon the available cores for a parallel execution.
Thread synchronisation
Section titled “Thread synchronisation”Threads are often used to split up computational work to be executed on multiple CPU cores.
After all the individual computations have finished, we often need to synchronise the Threads again. For example to aggregate the individual results.
// create threadsvar t1 = new Thread(() => DoWork(batch1));var t2 = new Thread(() => DoWork(batch2));
// start computationt1.Start();t2.Start();
// wait for t1 and t2 to finisht1.Join();t2.Join();
Accessing shared resources
Section titled “Accessing shared resources”private static int Counter = 0;
public static void Main(){ List<Thread> threads = new(); Enumerable.Range(0, 10) .ToList() .ForEach(i => { var t = new Thread(() => { var currentCount = Counter; Thread.Sleep(1); Counter = currentCount + 1; }); threads.Add(t); t.Start(); });
threads.ForEach(t => t.Join()); Console.WriteLine(Counter);}
The result might be anything between 1 and 10. This is called race condition.
Race condition
Section titled “Race condition”Race condition (2)
Section titled “Race condition (2)”Dealing with race conditions
Section titled “Dealing with race conditions”private static int Counter = 0;private static object lockObj = new();
public static void Main(){ List<Thread> threads = new(); Enumerable.Range(0, 10).ToList().ForEach(i => { var t = new Thread(() => { lock (lockObj) { var currentCount = Counter; Thread.Sleep(1); Counter = currentCount + 1; } }); threads.Add(t); t.Start(); });
threads.ForEach(t => t.Join()); Console.WriteLine(Counter);}
Dealing with race conditions (2)
Section titled “Dealing with race conditions (2)”private static int Counter = 0;
public static void Main(){ List<Thread> threads = new(); Enumerable.Range(0, 10) .ToList() .ForEach(i => { var t = new Thread(() => { Interlocked.Increment(ref Counter); }); threads.Add(t); t.Start(); });
threads.ForEach(t => t.Join()); Console.WriteLine(Counter);}
Signaling between threads
Section titled “Signaling between threads”We can use signals to synchronize threads. These signals don’t carry any data, so we don’t send data between threads. We only send a signal so that another thread might know when to wait and when to continue.
These signals are called Semaphores.
Semaphore
Section titled “Semaphore”A semaphore is a data structure that allows the synchronisation of threads.
- A semaphore manages a number of permits.
- When accessing a semaphore, we reduce the amount of permits.
- If no permits are left, the thread has to wait until permits are available again.
Semaphore example
Section titled “Semaphore example”// creating a semaphorevar sem = new SemaphoreSlim(initialCount: 0);
// By calling Wait(), the number of permits will be reduced.// If no permits are left// the thread execution is blocked.sem.Wait();
// By calling Release()// we give back one permit to the Semaphoresem.Release();
Semaphore example (2)
Section titled “Semaphore example (2)”public class Crane { public static readonly SemaphoreSlim CraneGuard = new SemaphoreSlim(0); public static void Run() { while(true) { Move("Storage", "MachineA"); MachineA.MachineAGuard.Release(); CraneGuard.Wait(); Move("MachineA", "MachineB"); MachineB.MachineBGuard.Release(); CraneGuard.Wait(); Move("MachineB", "Storage"); } }}
Semaphore example (3)
Section titled “Semaphore example (3)”public class MachineA { public static readonly SemaphoreSlim MachineAGuard = new SemaphoreSlim(0); public static void Run() { while(true) { MachineAGuard.Wait(); Process(); Crane.CraneGuard.Release(); } }}
Semaphore example (4)
Section titled “Semaphore example (4)”public class MachineB { public static readonly SemaphoreSlim MachineBGuard = new SemaphoreSlim(0); public static void Run() { while(true) { MachineBGuard.Wait(); Process(); Crane.CraneGuard.Release(); } }}
Nachrichtenaustausch zwischen Threads
Section titled “Nachrichtenaustausch zwischen Threads”Semaphore sind reine Signalgeber. Es werden Signale zwischen Threads ausgetauscht. Diese haben jedoch keinerlei weitere Informationen. Ein Semaphor-Signal kann zusätzlich keine Daten weitergeben.
Nachrichtenaustausch (2)
Section titled “Nachrichtenaustausch (2)”Eine Möglichkeit trotzdem einen Datenaustausch zu ermöglichen ist die Verwenden von Shared Memory. Das ist ein Speicher den mehrere Threads nutzen.
Semaphore können weiterhin zur Signalisierung verwendet werden, die Daten werden aber in einem anderen Objekt verwaltet. Beim Zugriff auf dieses Objekt ist dann besonders darauf zu achten, dass dieser Thread-Safe ist, also beispielsweise durch lock
Statements abgesichert ist.
Nachrichtenaustausch (3)
Section titled “Nachrichtenaustausch (3)”private static SemaphoreSlim sem = new(0);private static object lockObj = new();private static string message = string.Empty;
public static void Main(){ new Thread(WorkerA).Start(); new Thread(WorkerB).Start();}
static void WorkerA(){ while (true) { lock (lockObj) { message = "hello"; }
sem.Release(); Thread.Sleep(1000); }}
static void WorkerB(){ while (true) { sem.Wait(); lock (lockObj) { Console.WriteLine(message); } }}
Nachrichtenaustausch - Collections
Section titled “Nachrichtenaustausch - Collections”Zum Nachrichtenaustausch zwischen Threads bietet .NET auch einige Klassen an, die eigens dafür konzipiert wurden.
BlockingCollection<T>
ConcurrentDictionary<TKey, TValue>
ConcurrentQueue<T>
ConcurrentStack<T>
ConcurrentBag<T>
Nachrichtenaustausch - Blocking Collection
Section titled “Nachrichtenaustausch - Blocking Collection”Eine Blocking Collection kann als Puffer wirken. Es werden nachrichten in die Collection hinzugefügt, die später von anderen Threads verarbeitet werden. Die Nachrichten können sich in der Collection ansammeln und später wieder verringern.
Producer-Consumer Pattern
Section titled “Producer-Consumer Pattern”public class Crane { public static BlockingCollection<int> StorageQueue = new BlockingCollection<int>(100);
public void Run() { while (true) { var item = StorageQueue.Take();
Console.WriteLine( $"storage queue take: {item}"); MachineA.MachineAQueue.Add(item); } }}
public class MachineA { public readonly static BlockingCollection<int> MachineAQueue = new BlockingCollection<int>(100);
public void Run() { while (true) { var item = MachineAQueue.Take(); Console.WriteLine( $"Machine A processed: {item}"); MachineB.MachineBQueue.Add(item); } }}
public class MachineB { public readonly static BlockingCollection<int> MachineBQueue = new BlockingCollection<int>(100);
public void Run() { while (true) { var item = MachineBQueue.Take(); Console.WriteLine( $"Machine B processed: {item}"); } }}