#include <stdio.h>
#include <process.h>
#include <windows.h>
#include <vector>
#include <queue>
#define TIME_STEP 1000 // Шаг повтора, мс
#define TIME_OFFSET 500 // Шаг смещения начала работы, мс
HANDLE stop_thread = NULL;
typedef struct
{
char ip[20];
int time_offset;
HANDLE ready;
} arg_t;
typedef struct
{
char ip[20];
int time;
int cpu;
} result_t;
std::queue<result_t> result;
HANDLE result_change = NULL; // для защиты от изменения result несколькими потоками одновременно
unsigned __stdcall thread( void* pArguments )
{
// Инициализация потока
srand(GetTickCount());
Sleep(rand() % 100); // эмуляция работы
arg_t arg;
memcpy(&arg, pArguments, sizeof(arg));
printf("%d: %s init thread\n", GetTickCount(), arg.ip);
SetEvent(arg.ready);
// Выполнение расчетов
DWORD sleep_time = TIME_STEP - (GetTickCount() + arg.time_offset) % TIME_STEP;
printf("%d: %s sleep %d ms\n", GetTickCount(), arg.ip, sleep_time);
while(WaitForSingleObject(stop_thread, sleep_time) != WAIT_OBJECT_0) {
result_t r;
r.time = GetTickCount();
memcpy(r.ip, arg.ip, sizeof(r.ip));
// Запрос данных
printf("%d: %s start\n", GetTickCount(), arg.ip);
Sleep(rand() % 1000); // эмуляция работы
r.cpu = rand() % 100;
// Запись результата в очередь
WaitForSingleObject(result_change, INFINITE);
result.push(r);
SetEvent(result_change);
sleep_time = TIME_STEP - (GetTickCount() + arg.time_offset) % TIME_STEP;
printf("%d: %s stop\n", GetTickCount(), arg.ip);
}
// Завершение потока
Sleep(rand() % 100); // эмуляция работы
printf("%d: %s end thread\n", GetTickCount(), arg.ip);
_endthreadex(0);
return 0;
}
int main( void )
{
stop_thread = CreateEvent(NULL, true, false, NULL);
result_change = CreateEvent(NULL, false, true, NULL);
int time_offset = 0;
// Создание потоков с ожиданием их инициализации
std::vector<HANDLE> threads;
for(int i = 0; i < 3; i++)
{
arg_t arg; // параметры запуска потока
sprintf_s(arg.ip, sizeof(arg.ip), "192.168.0.%d", i);
arg.ready = CreateEvent(NULL, false, false, NULL);
arg.time_offset = time_offset;
time_offset = (time_offset + TIME_OFFSET) % TIME_STEP;
HANDLE th = (HANDLE) _beginthreadex(NULL, 0, &thread, (void *) &arg, 0, NULL);
if(th == NULL) {
printf("Error create thread %d\n", i);
CloseHandle(th);
} else if(WaitForSingleObject(arg.ready, 1000) == WAIT_TIMEOUT) { // Ждем инициализацию потока 1000 мс.
printf("Error init thread %d\n", i);
CloseHandle(th);
} else {
threads.push_back(th);
}
CloseHandle(arg.ready);
}
// Ожидаем результаты и пишем в базу 5 сек.
DWORD stop_time = GetTickCount() + 5000;
while(stop_time > GetTickCount()) {
Sleep(TIME_STEP);
while(true) {
WaitForSingleObject(result_change, INFINITE);
result_t* r = NULL;
if(!result.empty()) {
r = &result.front();
}
SetEvent(result_change); // чтобы не держать очередь пока идет отправка запроса
if(r == NULL) {
break;
}
// вставка в базу
printf("%d: insert into MyTable (IP, time, cpu) values ('%s', %d, %d)\n", GetTickCount(), r->ip, r->time, r->cpu);
// удаление из очереди
WaitForSingleObject(result_change, INFINITE);
result.pop();
SetEvent(result_change);
}
}
// Завершение работы
printf("%d: stop all thread\n", GetTickCount());
SetEvent(stop_thread);
WaitForMultipleObjects(threads.size(), &threads[0], true, INFINITE);
CloseHandle(stop_thread);
CloseHandle(result_change);
for(int i = 0; i < threads.size(); i++) {
CloseHandle(threads[i]);
}
threads.clear();
printf("%d: finish\n", GetTickCount());
system("pause");
}
|