Синхронизация потоков через именованные мьютексы

Compositum
Дата: 06.07.2014 20:35:45
Доброго времени суток.

В .NET синхронизировать процессы можно при помощи именованных мьютексов и семафоров. Экспериментирую по обозначенной теме.

Например, пусть два процесса выполняют запись в один и тот же текстовый файл: первый процесс пишет некоторое количество раз слово "dog", а второй - слово "cat". При этом нужно организовать работу процессов так, чтобы слова чередовались. Т.е. на выходе должен получиться файл со следующим содержимым:
Ожидаемый текстовый файл
dog
cat
dog
cat
dog
cat
dog
cat
...
dog
cat

Моё решение выглядит следующим образом:
Первое приложение (Launcher.exe) создаёт именованный мьютекс и блокирует его. Затем создаются два идентичных процесса (ConsoleApplication2.exe), синхронизация которых должна происходить через обозначенный мьютекс. Когда процессы созданы, с мьютекса снимается блокировка и процессы запускаются...

Однако по факту начало текстового файла начинается не так, как мне бы того хотелось. Например, это может выглядеть так:
Фактический текстовый файл
dog
dog
dog
dog
cat
dog
cat
dog
cat
dog
cat
dog
cat
dog
cat
...

Т.е. согласно результату, в моём коде синхронизация происходит не сразу, а спустя некоторое время. Хотелось бы понять, где я напортачил в коде.

Код Launcher.exe:
// Program.cs
// It builds the Launcher.exe
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Launcher {

  class Program {
    static void Main(string[] args) {

      Console.Title = "Launcher";
      String fileFullName = @"c:\_threads_laboratory\data.txt";

      using (Mutex mutex = new Mutex(true, "my_mutex")) {

        if (File.Exists(fileFullName))
          File.Delete(fileFullName);

        Process proc_1 = new Process();
        String exeName = @".\ConsoleApplication2.exe";
        ProcessStartInfo info_1 = new ProcessStartInfo(exeName, "proc_#1 dog");
        proc_1.StartInfo = info_1;
        proc_1.Start();
        Console.WriteLine("proc_#1 started by launcher...");

        Process proc_2 = new Process();
        ProcessStartInfo info_2 = new ProcessStartInfo(exeName, "proc_#2 cat");
        proc_2.StartInfo = info_2;
        proc_2.Start();
        Console.WriteLine("proc #2 started by launcher...");

        mutex.ReleaseMutex();

        proc_1.WaitForExit();
        proc_2.WaitForExit();
      }

      Console.WriteLine("Result in the \"{0}\" file.", fileFullName);
      Console.WriteLine("Press any key for exit...");
      Console.ReadKey();
    }
  }
}


Код ConsoleApplication2.exe:
// Program_2.cs
// It builds the ConsoleApplication2.exe
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;

namespace ConsoleApplication2 {

  class Program_2 {

    static void Main(string[] args) {
      if (2 != args.Length)
        return;

      Console.Title = args[0];

      Mutex mutex = new Mutex(false, "my_mutex");

      String dir = @"c:\_threads_laboratory";
      String file = "data.txt";
      String fullName = Path.Combine(dir, file);
      if (!Directory.Exists(dir))
        Directory.CreateDirectory(dir);
      String text = args[1];
      Int32 counter = 100;

      using (FileStream fs = File.Open(fullName, FileMode.Append,
        FileAccess.Write, FileShare.ReadWrite)) {
        using (StreamWriter sw = new StreamWriter(fs)) {
          while (counter-- > 0) {
            mutex.WaitOne();
            fs.Position = fs.Length;
            sw.WriteLine(text);
            Console.Write("*");
            sw.Flush();
            fs.Flush(true);
            mutex.ReleaseMutex();
            Thread.Sleep(0);
          }
          sw.Close();
        }
        fs.Close();
      }
    }
  }
}

Буду признателен за конструктивные замечания по теме.

Спасибо.
cdtyjv
Дата: 07.07.2014 00:40:28
Compositum,
Давайте отталкиваться от теории. В мире синхронизации есть классическая структура - монитор Хоара, погуглите. Он состоит из двух фич:
1) Mutual exclusion - когда один поток занял его, другой не может его занять.
2) Conditional variables - когда один поток занял монитор, но внутри критической секции понял, что какое-то условие не выполнено, он может временно отпустить монитор, что бы другой поток его занял, и, возможно, перевел систему в такое состояние, что бы условие первого потока оказалось выполненным, и он смог продолжить работу. Когда второй поток это делает, то перед тем, как отпустить монитор, он сигнализирует другим потокам, что те могут попробовать заново проверить те условия, на которых они обломались.
Это самые основы.

В чем изъян вашего решения? В том, что вы используете только mutual exclusion, а вам нужна еще и conditional variable. То есть, ваш алгоритм должен выглядеть так:
1) Поток 1 занимает мьютекс.
2) Поток 1 проверяет, что было записано в файл последний раз.
2.1) Если ничего, или же последним писал поток 2 - то записать свою строку.
2.2) Если же последний раз писал этот же поток, то отпустить мьютекс, и ждать, пока второй поток что-то не запишет.
Только в такой реализации у вас реально будут чередоваться строки.

В вашей же реализации этого условия нет. Поэтому первый поток (процесс) стартует первым, и начинает записывать свою строку много раз. Потом через некоторое время стартует второй поток, и теперь строки начинают чередоваться. Но чередуются они только потому, что захват мьютекса ОС достаточно дорогая операция, а потому как только первый поток отпускает его, второй поток почти всегда успевает захватить его первым. Если вы проделаете тот же трюк, например, с объектом Monitor, или с локальным мьютексом в рамках одного процесса, то вы увидите, что ваше решение не работает не только в начале, но не работает вовсе.

Отправная точка вам дана, дальше разбирайтесь сами. Если будет снова непонятно - спрашивайте.
Compositum
Дата: 07.07.2014 09:15:40
cdtyjv
2) Поток 1 проверяет, что было записано в файл последний раз.
2.1) Если ничего, или же последним писал поток 2 - то записать свою строку.
2.2) Если же последний раз писал этот же поток, то отпустить мьютекс, и ждать, пока второй поток что-то не запишет.

Я думал об этом варианте решения в процессе написания кода, но полагал, что мьютекса в данном случае будет достаточно. Мой текущий выбор был обусловлен следующими соображениями:

1. Launcher.exe запускает оба процесса, и они оба моментально "замирают в ожидании", дойдя до строчки кода
mutex.WaitOne();

2. После того, как Launcher.exe освобождает мьютекс
mutex.ReleaseMutex();

Его тут же захватывает любой из ожидающих процессов и выполняет один виток в цикле. В коде этого цикла, сразу после освобождения мьютекса, присутствует строка
Thread.Sleep(0);

Её задача - не дать потоку данного процесса сразу пойти на очередной виток (это возможно, т.к. мьютекс освобождён), но вместо этого выполнить переключение исполняемых потоков (в данном случае на поток, выполняемый в др. процессе), при условии что их несколько.

Мои предположения о причине проблемы в текущем коде были следующими:

1. Возможно один из процессов не успевает дойти до точки
mutex.WaitOne();

в то время как в коде Launcher.exe мьютекс уже освобождён. Для решения этого я пробовал в Launcher.exe, перед освобождением мьютекса ставить такой код:
Thread.Sleep(100);

чтобы оба процесса успели дойти до "нужной кондиции". Однако это не помогло.

2. Возможно метод
Thread.Sleep(0);
ориентирован на переключение потоков в рамках одного процесса.

P.S. Спасибо за ответ, добавлю в код conditional variable.
Compositum
Дата: 07.07.2014 09:19:01
Сейчас в голову пришёл третий вариант:

3. Возможно переключение происходит не на второй процесс, как я ожидал, а на Launcher.exe, который замер в ожидании завершения работы процессов. В виду этого тут же происходит очередное переключение, но не на второй процесс, а всё на тот же, первый (т.е. обратно к тому, кто "пнул мячик").
D129
Дата: 07.07.2014 11:04:40
Compositum
Сейчас в голову пришёл третий вариант:

3. Возможно переключение происходит не на второй процесс, как я ожидал, а на Launcher.exe, который замер в ожидании завершения работы процессов. В виду этого тут же происходит очередное переключение, но не на второй процесс, а всё на тот же, первый (т.е. обратно к тому, кто "пнул мячик").


Переключением процессов управляет виндовс, и произвольным образом.
Вы на это повлиять не можете.
Кроме того, поведение будет другим, если вы запустите ваши аппликации на многопроцессорной системе.
Arm79
Дата: 07.07.2014 11:13:16
Compositum
пусть два процесса выполняют запись в один и тот же текстовый файл

Это теоретическая или практическая задача? Если практическая, то это не самый удачный вариант. Запись в файл - медленный процесс, и было бы правильнее сделать так, чтобы в файл писал только один поток. Конечно, это не абсолютное утверждение, и в популярных библиотеках логирования есть поддержка записи кучи потоков/процессов в один файл, но это непопулярное решение.

Compositum
первый процесс пишет некоторое количество раз слово "dog", а второй - слово "cat". При этом нужно организовать работу процессов так, чтобы слова чередовались

Далее, сама по себе идея обеспечения очередности следования потоков противоречит самой идее параллельного программирования

Compositum
Первое приложение (Launcher.exe) создаёт именованный мьютекс и блокирует его

Зачем вообще в первом приложении создавать мьютекс? Что, вторые приложения между собой сами не могут разрулить ситуацию?
Compositum
Дата: 07.07.2014 11:33:20
Arm79
Это теоретическая или практическая задача?

Это просто "хелло ворлд" для себя, чтобы пощупать на примере.
Arm79
Зачем вообще в первом приложении создавать мьютекс? Что, вторые приложения между собой сами не могут разрулить ситуацию?

Путём введения третьего приложения я планировал подвести оба процесса к "стартовой черте на беговой дорожке", чтобы они находились в ожидании в одной и той же точке кода.
Arm79
Дата: 07.07.2014 11:39:05
Ну если только для примера, вот код с упрощениями (не два процесса, а два потока, не файл, а консоль)

using System;
using System.Threading;

namespace ConsoleApplication4
{
    class Program
    {
        private static readonly Thread _thread1 = new Thread(ThreadHandler) {IsBackground = true};
        private static readonly Thread _thread2 = new Thread(ThreadHandler) {IsBackground = true};

        private static void ThreadHandler(object state)
        {
            Mutex mtx = Mutex.OpenExisting("SQL.RU");

            while (true)
            {
                mtx.WaitOne();
                try
                {
                    var rnd = new Random();
                    int count = rnd.Next(1, 11);

                    for (int i = 0; i < count; i++)
                    {
                        Console.WriteLine((string)state);
                        Thread.Sleep(100);
                    }
                }
                finally
                {
                    mtx.ReleaseMutex();
                }

                Thread.Sleep(1000);
            }
        }

        static void Main(string[] args)
        {
            var mtx = new Mutex(false, "SQL.RU");
            try
            {
                _thread1.Start("dog");
                _thread2.Start("cat");

                Console.ReadLine();
            }
            finally
            {
                mtx.Dispose();
            }
        }
    }
}
Arm79
Дата: 07.07.2014 11:40:50
Compositum
При этом нужно организовать работу процессов так, чтобы слова чередовались

Блин, этого не увидел... Сейчас...
Konst_One
Дата: 07.07.2014 11:41:48
Compositum
Путём введения третьего приложения я планировал подвести оба процесса к "стартовой черте на беговой дорожке", чтобы они находились в ожидании в одной и той же точке кода.



MSMQ может тогда уж лучше юзать?