Скорость выполнения множества потоков в многоядерном процессоре.

vadipok
Дата: 22.10.2014 15:37:44
Добрый день!

Сейчас МНОГОПОТОЧНАЯ программка выполняется в одноядерной виртуальной машине. Скорость устраивает.
Но хотим выделить нормальную, многоядерную машину(4 или 8), в связи с тем, что там находится и СУБД.
В СУБД пока данных мало, это и пугает.
Visual c++ 2010 XE.
В частности куски программки есть тут.

Вопрос. Следует ли ожидать увеличения скорости выполнения программы?(скорее всего да, но все таки)
И все ли ядра будут использоваться в одном запуске многопоточной программы? (вот тут действительно не понятно)
MasterZiv
Дата: 22.10.2014 15:46:09
vadipok,
скажем так - увеличение скорости работы программы возможно.
Dima T
Дата: 22.10.2014 16:07:33
vadipok
Вопрос. Следует ли ожидать увеличения скорости выполнения программы?(скорее всего да, но все таки)

Да, если программе не хватает именно процессорного времени. У тебя возможно затык в сети, т.к. все 300 потоков одновременно начинают ее пользовать.
vadipok
И все ли ядра будут использоваться в одном запуске многопоточной программы? (вот тут действительно не понятно)

300 твоих потоков распределятся на все ядра.

Я бы для начала посмотрел как сейчас загружен проц на виртуалке. Если постоянно на 100%, то должно помочь. Если всплесками - то поразбираться

PS Надеюсь ты переписал как я в конце советовал, чтобы объект не создавался каждый раз.

PPS Забыл тогда дописать, я бы еще посоветовал развести пробуждение потоков по времени через какой-то промежуток, например 1 сек., тебе же главное периодичность раз в минуту и не критично если первый IP будет опрашиваться всегда в 00 сек, второй в 01 сек и т.д., тогда и одного процессора в виртуалке может хватить.
vadipok
Дата: 23.10.2014 13:35:08
Добрый день!
Спасибо!
Dima T
Да, если программе не хватает именно процессорного времени. У тебя возможно затык в сети, т.к. все 300 потоков одновременно начинают ее пользовать.

300 твоих потоков распределятся на все ядра.

Потоков уже 500.


Dima T
Я бы для начала посмотрел как сейчас загружен проц на виртуалке. Если постоянно на 100%, то должно помочь. Если всплесками - то поразбираться

C++ программка забивает полностью, потом отпускает, если помните там стоял Sleep(60000); этим и объясняется.


Dima T
PS Надеюсь ты переписал как я в конце советовал, чтобы объект не создавался каждый раз.

Переписал, но скорость не устроила.
Может что-то не так делаю?
+
	while (rs->next())
	{
		t1  = rs->getInt (2);
		t2  = rs->getInt (3);
		t3  = rs->getInt (4);
		t4  = rs->getInt (5);
		ss << t1 << "." << t2 << "." << t3 << "." << t4;
		t = ss.str();

		// параметры запуска потока
		arg_t arg; 
		sprintf_s(arg.ip, sizeof(arg.ip), strdup(t.c_str()));
		//const char babac = (char) rs->getInt (1);
		sprintf_s(arg.employeeID, sizeof(arg.employeeID), "434");
		arg.work = CreateEvent(NULL, false, false, NULL);
		arg.free = CreateEvent(NULL, false, false, NULL);
		HANDLE th = (HANDLE) _beginthreadex(NULL, 0, &getData, (void *) &arg, 0, NULL);
		if(th == NULL) {
			cout << "Error create thread:" << strdup(t.c_str()) << endl;
			CloseHandle(th);
			CloseHandle(arg.work);
			CloseHandle(arg.free);
		} else {
			threads.push_back(th);
			if(WaitForSingleObject(arg.free, 1000) == WAIT_TIMEOUT) { // Ждем инициализацию потока 1000 мс.
				cout << "Error init thread:" << strdup(t.c_str()) << endl;
				CloseHandle(arg.free);
				CloseHandle(arg.work);
			} else {
				th_work.push_back(arg.work);
				th_free.push_back(arg.free);
			}
		}

		//clear
		ss.str(std::string());
		ss.clear();
		t.clear();
	}

	CPU_MEMORY cpu_memory;
	int c;
	// Выполнение полезной работы потоками 
	for(int n = 0; n < 3; n++) { // запускаем все 3 раза
		//Sleep(1000 - GetTickCount() % 1000); // ждем начало секунды
		printf("%d: START\n", GetTickCount());
		for(int i = 0; i < th_work.size(); i++) {
			SetEvent(th_work[i]);
		}
		int res = WaitForMultipleObjects(th_free.size(), &th_free[0], true, INFINITE);
		printf("%d: END\n", GetTickCount());

		for (c=0;c<cpu_memory_vec.size();c++) {
			cpu_memory = cpu_memory_vec[c];
			cout << cpu_memory.employeeID << endl;
			cout << cpu_memory.percentCPU << endl;
			cout << cpu_memory.percentMemory << endl;
			cout << cpu_memory.freeMemory << endl;
		}
		cpu_memory_vec.clear();
	}

typedef struct
{
	char ip[1000];
	char employeeID[1000];
	HANDLE work;
	HANDLE free;
} arg_t;

Это ключевые изменения вашего примера. Если честно, то так и не удалось записать в char employeeID[1000]; значение employeeID.
На время тестов просто записал 434.


Dima T
PPS Забыл тогда дописать, я бы еще посоветовал развести пробуждение потоков по времени через какой-то промежуток, например 1 сек., тебе же главное периодичность раз в минуту и не критично если первый IP будет опрашиваться всегда в 00 сек, второй в 01 сек и т.д., тогда и одного процессора в виртуалке может хватить.

Так нельзя сделать, запись в СУБД же происходит. А вектор для INSERT один для всех потоков.
Dima T
Дата: 23.10.2014 15:16:26
vadipok
Dima T
PPS Забыл тогда дописать, я бы еще посоветовал развести пробуждение потоков по времени через какой-то промежуток, например 1 сек., тебе же главное периодичность раз в минуту и не критично если первый IP будет опрашиваться всегда в 00 сек, второй в 01 сек и т.д., тогда и одного процессора в виртуалке может хватить.

Так нельзя сделать, запись в СУБД же происходит. А вектор для INSERT один для всех потоков.

Я так понимаю "вектор для INSERT" это хранилище результатов твоих замеров для всех потоков. Узкое место. Что мешает заменить на вектор на очередь и создать отдельный поток для записи в БД? Т.е. потоки пишут свои замеры в очередь, отдельный поток для обмена с БД периодически проверяет что очередь не пуста и если что-то есть, то пишет это в БД.

Если получится избавится от этого вектора, то цикл снаружи можно будет не запускать, перенести ожидание во внутрь потока. Попозже пример сделать постараюсь.
MasterZiv
Дата: 23.10.2014 15:45:06
vadipok
Добрый день!
Спасибо!
Dima T
Да, если программе не хватает именно процессорного времени. У тебя возможно затык в сети, т.к. все 300 потоков одновременно начинают ее пользовать.

300 твоих потоков распределятся на все ядра.

Потоков уже 500.


А зачем тебе столько, сынок ?
vadipok
Дата: 23.10.2014 16:35:34
MasterZiv,

Даже не знаю как объяснить.
В начале начальник планировал сканировать за раз 50 машин, я тоже так думал.
Забили, ок, все работает.
Забили больше, опять работает. ))
Дальше больше, и забили всех.

Довольно быстро отрабатывает один круг, поэтому так и оставили.
NekZ
Дата: 23.10.2014 16:42:23
vadipok,

А ты про Thread Pool слышал?
Когда кол-во активных потоков больше количества ядер, то в таком случае имеет место снижение производительности за счёт переключения контекста между потоками на каждом ядре.
vadipok
Дата: 23.10.2014 16:50:42
NekZ,

В нашем случае не так, доказали методом проб.
Этот закон наверно работает при сложных расчетах.
А у нас тупо извлечение информации по сети.
Причем время отклика информации далеко не одинаково.
Для некоторых машин мгновенно, а некоторые приходится ждать и по 20-30 секунд.
И все это время запущенный процесс не рассчитывает, а тупо ждет.(это уже я так думаю)


vadipok
Добрый день!
Сейчас МНОГОПОТОЧНАЯ программка выполняется в одноядерной виртуальной машине. Скорость устраивает.

Обратите на это внимание, я не жалуюсь на скорость.
Dima T
Дата: 23.10.2014 17:47:58
+ Изучай
#include <stdio.h>
#include <process.h>
#include <windows.h>
#include <vector>
#include <queue>

#define TIME_STEP  1000 // Шаг повтора, мс
#define TIME_OFFSET 500 // Шаг смещения начала работы, мс

HANDLE stop_thread = NULL;

typedef struct
{
	char ip[20];
	int time_offset;
	HANDLE ready;
} arg_t;

typedef struct
{
	char ip[20];
	int time;
	int cpu;
} result_t;

std::queue<result_t> result;
HANDLE result_change = NULL; // для защиты от изменения result несколькими потоками одновременно

unsigned __stdcall thread( void* pArguments )
{
	// Инициализация потока
	srand(GetTickCount());
	Sleep(rand() % 100); // эмуляция работы
	arg_t arg;
	memcpy(&arg, pArguments, sizeof(arg));
	printf("%d: %s init thread\n", GetTickCount(), arg.ip);
	SetEvent(arg.ready);
	// Выполнение расчетов
	DWORD sleep_time = TIME_STEP - (GetTickCount() + arg.time_offset) % TIME_STEP;
	printf("%d: %s sleep %d ms\n", GetTickCount(), arg.ip, sleep_time);
	while(WaitForSingleObject(stop_thread, sleep_time) != WAIT_OBJECT_0) {
		result_t r; 
		r.time = GetTickCount();
		memcpy(r.ip, arg.ip, sizeof(r.ip));
		// Запрос данных
		printf("%d: %s start\n", GetTickCount(), arg.ip);
		Sleep(rand() % 1000); // эмуляция работы
		r.cpu = rand() % 100;
		// Запись результата в очередь
		WaitForSingleObject(result_change, INFINITE);
		result.push(r);
		SetEvent(result_change);

		sleep_time = TIME_STEP - (GetTickCount() + arg.time_offset) % TIME_STEP;
		printf("%d: %s stop\n", GetTickCount(), arg.ip);
	}
	// Завершение потока
	Sleep(rand() % 100); // эмуляция работы
	printf("%d: %s end thread\n", GetTickCount(), arg.ip);
	_endthreadex(0);
    return 0;
}

int main( void )
{
	stop_thread = CreateEvent(NULL, true, false, NULL);
	result_change = CreateEvent(NULL, false, true, NULL);
	int time_offset = 0;
	// Создание потоков с ожиданием их инициализации
	std::vector<HANDLE> threads;
	for(int i = 0; i < 3; i++)
	{
		arg_t arg; // параметры запуска потока
		sprintf_s(arg.ip, sizeof(arg.ip), "192.168.0.%d", i);
		arg.ready = CreateEvent(NULL, false, false, NULL);
		arg.time_offset = time_offset;
		time_offset = (time_offset + TIME_OFFSET) % TIME_STEP;

		HANDLE th = (HANDLE) _beginthreadex(NULL, 0, &thread, (void *) &arg, 0, NULL);
		if(th == NULL) {
			printf("Error create thread %d\n", i);
			CloseHandle(th);
		} else if(WaitForSingleObject(arg.ready, 1000) == WAIT_TIMEOUT) { // Ждем инициализацию потока 1000 мс.
			printf("Error init thread %d\n", i);
			CloseHandle(th);
		} else {
			threads.push_back(th);
		}
		CloseHandle(arg.ready);
	}
	// Ожидаем результаты и пишем в базу 5 сек.
	DWORD stop_time = GetTickCount() + 5000;
	while(stop_time > GetTickCount()) {
		Sleep(TIME_STEP);
		while(true) {
			WaitForSingleObject(result_change, INFINITE);
			result_t* r = NULL;
			if(!result.empty()) {
				r = &result.front();
			}
			SetEvent(result_change); // чтобы не держать очередь пока идет отправка запроса
			if(r == NULL) {
				break;
			}
			// вставка в базу
			printf("%d: insert into MyTable (IP, time, cpu) values ('%s', %d, %d)\n", GetTickCount(), r->ip, r->time, r->cpu);
			// удаление из очереди
			WaitForSingleObject(result_change, INFINITE);
			result.pop();
			SetEvent(result_change);
		}
	}
	// Завершение работы
	printf("%d: stop all thread\n", GetTickCount());
	SetEvent(stop_thread);
	WaitForMultipleObjects(threads.size(), &threads[0], true, INFINITE);
	CloseHandle(stop_thread);
	CloseHandle(result_change);
	for(int i = 0; i < threads.size(); i++) {
		CloseHandle(threads[i]);
	}
	threads.clear();
	printf("%d: finish\n", GetTickCount());
	system("pause");
}

Заметь что каждый поток повторяет цикл через равный промежуток времени (TIME_STEP). Если минута то TIME_STEP 60000
Для равномерности распределения нагрузки поиграй параметром TIME_OFFSET.