Thursday 17 December 2009

Orchestrations, Serialization and WaitHandles

A common pattern I use in BizTalk development is to create an orchestration with an associated C# library. The orchestration does what it does best (i.e. orchestrate the business process) and the library is where most of the underlying business logic code lives. This also allows the business logic to be called from other non-orchestration based code if required.

In a recent scenario, the library code lent itself well to a multithreaded approach, consisting of a number of small, independent tasks which could be executed in parallel. I implemented this using a ThreadPool and used ManualResetEvent instances to enable the threads to signal when complete. The WaitHandle.WaitAll allowed the main thread to wait until all the tasks were complete before continuing. This worked fine and all my unit tests ran successfully.

Then I tried calling it from an orchestration and got the dreaded serialization error advising that ManualResetEvent is not serializable. This was occurring when I hit a persistence point in the orchestration causing my instance of the library to be serialized, in this case unsuccessfully.

One possible solution to this is the use of an atomic scope to prevent persistence, allowing non-serializable classes to be used. However, this is not recommended and in my case not really practical anyway given the structure of the orchestration. After considering this and other solutions, I finally settled on creating a custom notifier class that would allow the main thread to wait until all threads signalled they are complete. This would perform the same role as ManualResetEvent/WaitHandle but would be serializable.

A simple example of this follows below. The Notifier class has just two methods: SetOne which allows each individual worker thread to signal it is complete, and Wait which enables the main thread to wait until either all worker threads are finished or a timeout period expires. This uses the static Pulse and Wait methods of the System.Threading.Monitor class to manage the signalling and waiting, providing an efficient, but serializable way to handle this pattern without resorting to a poll/sleep loop. The Notifier instance is passed to each worker thread (in much the same way as a ManualResetEvent would) allowing each thread to signal completion.

[Serializable()]
public class Notifier
{
private bool m_blnAllWorkItemsCompleted = false;
private Int32 m_i32CompletedWorkItems = 0;
private Int32 m_i32WorkItemCount = 0;
private object m_locker = new object();

public Notifier(Int32 workItemCount)
{
m_i32WorkItemCount = workItemCount;
}

public void SetOne()
{
lock (m_locker)
{
// Increment the completed work item count
m_i32CompletedWorkItems++;

// Check if all work items are complete
if (m_i32CompletedWorkItems == m_i32WorkItemCount)
{
// All complete so send pulse notification to stop the wait
m_blnAllWorkItemsCompleted = true;
Monitor.Pulse(m_locker);
}
}
}

public bool Wait(Int32 timeout)
{
lock (m_locker)
{
// Wait until the pulse is received or the timeout period expires
Monitor.Wait(m_locker, timeout);

// Return an indication of whether the work was completed
return m_blnAllWorkItemsCompleted;
}
}
}


class Program
{
const Int32 THREAD_COUNT = 5;

static void Main(string[] args)
{
// Create a new Notifier
Notifier notifier = new Notifier(THREAD_COUNT);

for (int i = 0; i < THREAD_COUNT; i++)
{
// Create state to pass through to the new thread
// Includes the notifier so that the thread can signal back
ThreadState state = new ThreadState(i, notifier);

// Spawn a new thread to do a single split item
ThreadPool.QueueUserWorkItem(MyThreadProc, state);
}

// Wait for 5 seconds for all threads to complete
if (notifier.Wait(5000))
{
Console.WriteLine("All threads completed");
}
else
{
Console.WriteLine("Notifier timed out");
}

Console.ReadLine();
}

private static void MyThreadProc(object stateInfo)
{
// Get the state passed in from the main thread
ThreadState myState = (ThreadState)stateInfo;
Notifier notifier = myState.Notifier;
Int32 i32Id = myState.Id;

// Wait for a random period to simulate work
Thread.Sleep(RandomNumber(500, 2000));

// Notify that the work is complete
Console.WriteLine(String.Format("[{0}] thread complete.", i32Id));
notifier.SetOne();
}

private static int RandomNumber(int min, int max)
{
Random random = new Random();
return random.Next(min, max);
}
}

[Serializable()]
class ThreadState
{
public int Id { get; set; }
public Notifier Notifier { get; set; }

public ThreadState(int id, Notifier notifier)
{
this.Id = id;
this.Notifier = notifier;
}
}


I found this article very helpful during my research for this code. Hope you find this useful too.

No comments: