#ifdef _WIN32 #ifndef _WIN32_WINNT #define _WIN32_WINNT 0x0501 #endif #if _WIN32_WINNT < _WIN32_WINNT_WINXP #error "works on win xp of later" #endif #ifdef _MSC_VER #define _CRT_SECURE_NO_WARNINGS #endif #endif #define __STDC_FORMAT_MACROS #include #include #include #include #include #include #include #include #ifdef _MSC_VER #include #else #include #endif #ifdef _WIN32 #include #include #include typedef int socklen_t; #ifdef _MSC_VER #pragma comment (lib, "Ws2_32.lib") #else #pragma message("build with \"-lws2_32\" or \"-lwsock32\"") #endif #else #include #include #include #include #include #include #define closesocket close #endif typedef char bool; const bool false = 0; const bool true = 1; typedef char ascii; typedef int8_t int1; typedef int16_t int2; typedef int32_t int4; typedef int64_t int8; typedef int64_t dec2; typedef int64_t dec8; typedef int64_t time8; time8 utime() { #ifdef _WIN32 SYSTEMTIME system_time; FILETIME file_time; ULARGE_INTEGER integer, integer0; GetSystemTime(&system_time); SystemTimeToFileTime(&system_time, &file_time); integer.HighPart = file_time.dwHighDateTime; integer.LowPart = file_time.dwLowDateTime; system_time.wYear = 1970; system_time.wDay = 1; system_time.wMonth = 1; system_time.wHour = 0; system_time.wMinute = 0; system_time.wSecond = 0; system_time.wMilliseconds = 0; SystemTimeToFileTime(&system_time, &file_time); integer0.HighPart = file_time.dwHighDateTime; integer0.LowPart = file_time.dwLowDateTime; return (time8)((integer.QuadPart - integer0.QuadPart) / 1000); #else struct timeval tv; struct timezone tz; gettimeofday(&tv, &tz); return (time8)tv.tv_sec * 1000000 + (time8)tv.tv_usec; #endif } void local_time(int *year, int *month, int *day, int *hour, int *minute, int *second, int *millisecond, int *microsecond) { #ifdef _WIN32 SYSTEMTIME system_time; GetLocalTime(&system_time); if (year != NULL) *year = (int)system_time.wYear; if (month != NULL) *month = (int)system_time.wMonth; if (day != NULL) *day = (int)system_time.wDay; if (hour != NULL) *hour = (int)system_time.wHour; if (minute != NULL) *minute = (int)system_time.wMinute; if (second != NULL) *second = (int)system_time.wSecond; if (millisecond != NULL) *millisecond = system_time.wMilliseconds; if (microsecond != NULL) *microsecond = 0; #else time8 time8; time_t time4; struct tm *tm; time8 = utime(); time4 = (time_t)(time8 / 1000000); tm = localtime(&time4); if (year != NULL) *year = (int)tm->tm_year; if (month != NULL) *month = (int)tm->tm_mon + 1; if (day != NULL) *day = (int)tm->tm_mday; if (hour != NULL) *hour = (int)tm->tm_hour; if (minute != NULL) *minute = (int)tm->tm_min; if (second != NULL) *second = (int)tm->tm_sec; if (millisecond != NULL) *millisecond = (int)(time8 / 1000 % 1000); if (microsecond != NULL) *microsecond = (int)(time8 % 1000); #endif } void _vlog(FILE *file, const char level[], const char format[], va_list args) { int hour, minute, second, millisecond; local_time(NULL, NULL, NULL, &hour, &minute, &second, &millisecond, NULL); fprintf(file, "[%02d:%02d:%02d.%03d] [%s] ", hour, minute, second, millisecond, level); vfprintf(file, format, args); fputc('\n', file); } #ifdef __GNUC__ void _log(FILE *file, const char level[], const char format[], ...) __attribute__((format(printf, 3, 4))); #endif void _log(FILE *file, const char level[], const char format[], ...) { va_list arg_list; va_start(arg_list, format); _vlog(file, level, format, arg_list); va_end(arg_list); } #define debug(format, ...) _log(stdout, "debug", format, ##__VA_ARGS__) #define info(format, ...) _log(stdout, "info", format, ##__VA_ARGS__) #define warning(format, ...) _log(stderr, "warning", format, ##__VA_ARGS__) #define error(format, ...) _log(stderr, "error", format, ##__VA_ARGS__) void print_error(const char message[]) { #ifdef _WIN32 LPSTR buffer = NULL; if (FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, WSAGetLastError(), MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US), (LPSTR)&buffer, 0, NULL) == 0) { error("%s: (%d) unknown error", message, WSAGetLastError()); return; } error("%s: (%d) %s", message, WSAGetLastError(), buffer); LocalFree(buffer); #else error("%s: (%d) %s", message, errno, strerror(errno)); #endif } int get_addr_info(const char host[], struct sockaddr_in *sock_addr) { const struct addrinfo hints = { 0, AF_INET, SOCK_DGRAM }; struct addrinfo *addr_info = 0, *ai; int gai_error; gai_error = getaddrinfo(host, 0, &hints, &addr_info); if (gai_error != 0) return error("Failed to resolve address \"%s\": (%d) %s", host, gai_error, gai_strerror(gai_error)), -1; for (ai = addr_info; ai != NULL; ai = ai->ai_next) { if (ai->ai_family != AF_INET) continue; memcpy(sock_addr, ai->ai_addr, ai->ai_addrlen); break; } freeaddrinfo(addr_info); if (ai == NULL) return -1; return 0; } int setup(struct sockaddr_in *sock_addr) { int sock; int flag; sock = socket(AF_INET, SOCK_DGRAM, 0); if (sock == -1) return print_error("socket failed"), -1; flag = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof(flag)) == -1) return print_error("setsockopt failed"), -1; if (bind(sock, (struct sockaddr *)sock_addr, sizeof(*sock_addr)) == -1) return print_error("bind failed"), -1; info("Socket is bound to %s:%hu", inet_ntoa(sock_addr->sin_addr), ntohs(sock_addr->sin_port)); return sock; } int multicast(int sock, int command, struct in_addr *in_addr, struct in_addr *source_in_addr) { if (source_in_addr != NULL) { struct ip_mreq_source ip_req; memset(&ip_req, 0, sizeof(ip_req)); memcpy(&ip_req.imr_multiaddr, in_addr, sizeof(ip_req.imr_multiaddr)); memcpy(&ip_req.imr_sourceaddr, source_in_addr, sizeof(ip_req.imr_sourceaddr)); if (setsockopt(sock, IPPROTO_IP, command, (char *)&ip_req, sizeof(ip_req)) == -1) return print_error("setsockopt failed"), -1; } else { struct ip_mreq ip_req; memset(&ip_req, 0, sizeof(ip_req)); memcpy(&ip_req.imr_multiaddr, in_addr, sizeof(ip_req.imr_multiaddr)); if (setsockopt(sock, IPPROTO_IP, command, (char *)&ip_req, sizeof(ip_req)) == -1) return print_error("setsockopt failed"), -1; } return 0; } const unsigned short ORDER_BOOK_PORT = 6000; const char ORDER_BOOK_SOURCE_HOST[] = "194.247.145.35"; const char ORDER_BOOK_ONLINE_HOST_A[] = "239.16.0.41"; const char ORDER_BOOK_ONLINE_HOST_B[] = "239.16.0.141"; const char ORDER_BOOK_SNAPSHOT_HOST_A[] = "239.16.0.40"; const char ORDER_BOOK_SNAPSHOT_HOST_B[] = "239.16.0.140"; const char *ORDER_BOOK_HOST[] = { ORDER_BOOK_ONLINE_HOST_A, ORDER_BOOK_ONLINE_HOST_B, ORDER_BOOK_SNAPSHOT_HOST_A, ORDER_BOOK_SNAPSHOT_HOST_B, }; size_t ORDER_BOOK_N_HOSTS = sizeof(ORDER_BOOK_HOST) / sizeof(*ORDER_BOOK_HOST); const unsigned short ORDER_BOOK_SPB_PORT = 6000; const char ORDER_BOOK_SPB_SOURCE_HOST[] = "194.247.145.35"; const char ORDER_BOOK_SPB_ONLINE_HOST_A[] = "239.16.0.31"; const char ORDER_BOOK_SPB_ONLINE_HOST_B[] = "239.16.0.131"; const char ORDER_BOOK_SPB_SNAPSHOT_HOST_A[] = "239.16.0.30"; const char ORDER_BOOK_SPB_SNAPSHOT_HOST_B[] = "239.16.0.130"; const char *ORDER_BOOK_SPB_HOST[] = { ORDER_BOOK_SPB_ONLINE_HOST_A, ORDER_BOOK_SPB_ONLINE_HOST_B, ORDER_BOOK_SPB_SNAPSHOT_HOST_A, ORDER_BOOK_SPB_SNAPSHOT_HOST_B, }; size_t ORDER_BOOK_SPB_N_HOSTS = sizeof(ORDER_BOOK_SPB_HOST) / sizeof(*ORDER_BOOK_SPB_HOST); const unsigned short COMMONES_PORT = 6010; const char COMMONES_SOURCE_HOST[] = "194.247.145.35"; const char COMMONES_ONLINE_HOST_A[] = "239.16.0.61"; const char COMMONES_ONLINE_HOST_B[] = "239.16.0.161"; const char COMMONES_SNAPSHOT_HOST_A[] = "239.16.0.60"; const char COMMONES_SNAPSHOT_HOST_B[] = "239.16.0.160"; const char *COMMONES_HOST[] = { COMMONES_ONLINE_HOST_A, COMMONES_ONLINE_HOST_B, COMMONES_SNAPSHOT_HOST_A, COMMONES_SNAPSHOT_HOST_B, }; size_t COMMONES_N_HOSTS = sizeof(COMMONES_HOST) / sizeof(*COMMONES_HOST); const unsigned short BEST_PRICES_PORT = 6010; const char BEST_PRICES_SOURCE_HOST[] = "194.247.145.35"; const char BEST_PRICES_ONLINE_HOST_A[] = "239.16.0.71"; const char BEST_PRICES_ONLINE_HOST_B[] = "239.16.0.171"; const char BEST_PRICES_SNAPSHOT_HOST_A[] = "239.16.0.70"; const char BEST_PRICES_SNAPSHOT_HOST_B[] = "239.16.0.170"; const char *BEST_PRICES_HOST[] = { BEST_PRICES_ONLINE_HOST_A, BEST_PRICES_ONLINE_HOST_B, BEST_PRICES_SNAPSHOT_HOST_A, BEST_PRICES_SNAPSHOT_HOST_B, }; size_t BEST_PRICES_N_HOSTS = sizeof(BEST_PRICES_HOST) / sizeof(*BEST_PRICES_HOST); const unsigned short TRADES_PORT = 6020; const char TRADES_SOURCE_HOST[] = "194.247.145.35"; const char TRADES_ONLINE_HOST_A[] = "239.16.0.91"; const char TRADES_ONLINE_HOST_B[] = "239.16.0.191"; const char *TRADES_HOST[] = { TRADES_ONLINE_HOST_A, TRADES_ONLINE_HOST_B, }; size_t TRADES_N_HOSTS = sizeof(TRADES_HOST) / sizeof(*TRADES_HOST); #pragma pack(push, 1) struct frame { int2 size; // Длина сообщения без учета заголовка frame в байтах int2 msgid; // Тип сообщения int8 seq; // Порядковый номер сообщения }; struct instrument { int2 market_id; // Идентификатор пула ликвидности (значения см. в разделе 3.6) int4 instrument_id; // Идентификатор торгового инструмента }; struct md_header { time8 system_time; // Время формирования сообщения int2 source_id; // Источник сообщения (значения см. в разделе 3.5) }; struct BestPrice { dec8 price; int1 type; ascii pad0[1]; int4 amount; time8 time; }; struct best_prices_msg { struct frame frame; struct md_header md_header; struct instrument instrument; int2 BestPrice_offset; int2 BestPrice_count; struct BestPrice BestPrice[1]; // extendable }; const int2 BEST_PRICES_ONLINE = 7651; const int2 BEST_PRICES_SNAPSHOT = 7653; struct CommonEntry { int1 type; int1 pad0; union { int8 as_int8; dec8 as_dec8; dec2 as_dec2; } value; }; struct commones_msg { struct frame frame; struct md_header md_header; struct instrument instrument; int2 CommonEntry_offset; int2 CommonEntry_count; struct CommonEntry CommonEntry[1]; }; const int2 COMMONES_SNAPSHOT = 1115; const int2 COMMONES_ONLINE = 1113; struct PriceLevel { dec8 price; int1 type; int1 flag; int4 amount; time8 time; }; struct order_book_msg { struct frame frame; struct md_header md_header; struct instrument instrument; int2 PriceLevel_offset; int2 PriceLevel_count; struct PriceLevel PriceLevel[1]; // extendable }; const int2 ORDER_BOOK_ONLINE = 1111; const int2 ORDER_BOOK_SNAPSHOT = 1112; struct trades_msg { struct frame frame; struct md_header md_header; struct instrument instrument; int8 trade_id; int4 amount; dec8 price; time8 trade_time; int1 trade_type; int1 dir; }; const int2 TRADES_ONLINE = 15210; struct snapshot_started_msg { struct frame frame; struct md_header md_header; int8 update_seq; }; const int2 SNAPSHOT_STARTED = 12345; struct snapshot_finished_msg { struct frame frame; struct md_header md_header; int8 update_seq; }; const int2 SNAPSHOT_FINISHED = 12312; struct heart_beat_msg { struct frame frame; struct md_header md_header; int4 reserved; }; const int2 HEART_BEAT = 15236; struct empty_book_msg { struct frame frame; struct md_header md_header; struct instrument instrument; }; const int2 EMPTY_BOOK = 15300; #pragma pack(pop) // timer typedef uint16_t timer_id_t; struct timer_list { struct timer_list *next; timer_id_t timer_id; time8 time; void (*handle)(void *data); void *data; }; struct timer_list *timer_list_add(struct timer_list *timer_list, timer_id_t timer_id, time8 time, void (*handle)(void *data), void *data) { if (timer_list == NULL || time < timer_list->time) { struct timer_list *new_timer_list; new_timer_list = malloc(sizeof(*timer_list)); new_timer_list->next = timer_list; new_timer_list->timer_id = timer_id; new_timer_list->time = time; new_timer_list->handle = handle; new_timer_list->data = data; return new_timer_list; } else { timer_list->next = timer_list_add(timer_list->next, timer_id, time, handle, data); return timer_list; } } struct timer_list *timer_list_remove(struct timer_list *timer_list, timer_id_t timer_id) { if (timer_list == NULL) return NULL; else if (timer_list->timer_id == timer_id) { struct timer_list *timer_list_next = timer_list->next; free(timer_list); return timer_list_next; } else { timer_list->next = timer_list_remove(timer_list->next, timer_id); return timer_list; } } struct timer_list * volatile timer_list = NULL; timer_id_t new_timer() { static timer_id_t timer_id = 1; return timer_id++; } void set_timer(timer_id_t timer_id, time8 time, void (*handle)(void *data), void *data) { timer_list = timer_list_remove(timer_list, timer_id); timer_list = timer_list_add(timer_list, timer_id, time, handle, data); } void drop_timer(timer_id_t timer_id) { timer_list = timer_list_remove(timer_list, timer_id); } struct timeval *get_timeout(struct timeval *tv) { time8 time, timeout = 0; if (timer_list == NULL) return NULL; time = utime(); if (time < timer_list->time) timeout = timer_list->time - time; tv->tv_sec = (uint32_t)(timeout / 1000000); tv->tv_usec = (uint32_t)(timeout % 1000000); return tv; } void process_timers() { time8 time; time = utime(); while (timer_list != NULL && timer_list->time <= time) { void (*handle)(void *data); void *data; handle = timer_list->handle; data = timer_list->data; timer_list = timer_list_remove(timer_list, timer_list->timer_id); handle(data); } } // message list #pragma pack(push, 1) struct msg_list { struct msg_list *next; struct frame msg; // extendable }; #pragma pack(pop) bool msg_list_add(struct msg_list **msg_list, struct frame *msg) { while (true) { if (*msg_list == NULL || msg->seq < (*msg_list)->msg.seq) { struct msg_list *new_msg_list; new_msg_list = malloc(sizeof(*new_msg_list) + msg->size); new_msg_list->next = *msg_list; memcpy(&new_msg_list->msg, msg, sizeof(*msg) + msg->size); *msg_list = new_msg_list; return true; } else if (msg->seq == (*msg_list)->msg.seq) return false; // duplicate message, discarding msg_list = &(*msg_list)->next; } return false; } void msg_list_remove(struct msg_list **msg_list) { struct msg_list *rem_msg_list = *msg_list; *msg_list = (*msg_list)->next; free(rem_msg_list); } // sequence rectifier const int8 INVALID_SEQ = -1; struct sequence { int8 seq; struct msg_list *buffer; unsigned buffer_size, buffer_limit; void (*process)(struct frame *msg); }; bool sequence_store(struct sequence *sequence, struct frame *msg) { if (!msg_list_add(&sequence->buffer, msg)) return false; ++sequence->buffer_size; if (sequence->buffer_limit != -1 && sequence->buffer_size >= sequence->buffer_limit) return false; return true; } void sequence_process_msg(struct sequence *sequence, struct frame *msg) { sequence->seq = msg->seq + 1; if (msg->msgid == HEART_BEAT) info("Heartbeat!"); else sequence->process(msg); } void sequence_apply_buffer(struct sequence *sequence) { while (sequence->buffer != NULL && sequence->buffer->msg.seq <= sequence->seq) { if (sequence->buffer->msg.seq == sequence->seq) { sequence_process_msg(sequence, &sequence->buffer->msg); } msg_list_remove(&sequence->buffer); --sequence->buffer_size; } } bool sequence_apply(struct sequence *sequence, struct frame *msg) { sequence_process_msg(sequence, msg); sequence_apply_buffer(sequence); return true; } void sequence_init(struct sequence *sequence, void (*process)(struct frame *msg)) { sequence->seq = INVALID_SEQ; sequence->process = process; sequence->buffer = NULL; sequence->buffer_size = 0; sequence->buffer_limit = -1; } void sequence_destroy(struct sequence *sequence) { while (sequence->buffer != NULL) msg_list_remove(&sequence->buffer); } void sequence_reinit(struct sequence *sequence) { void (*process)(struct frame *msg); process = sequence->process; sequence_destroy(sequence); sequence_init(sequence, process); } bool sequence_process(struct sequence *sequence, struct frame *msg) { if (sequence->seq == INVALID_SEQ || msg->seq == sequence->seq) // that is what we are waiting for return sequence_apply(sequence, msg); else if (msg->seq > sequence->seq) // something goes wrong, store this packet for the future return sequence_store(sequence, msg); else // if (msg->seq < sequence->seq) // duplicate packet, skipping return true; } // stream const time8 ONLINE_TIMEOUT = 5000000; // 5 seconds struct flow { int sock; struct sockaddr_in sock_addr; bool active; }; struct stream { size_t n_flows; struct flow *flow; struct sockaddr_in source_sock_addr; struct sequence online, snapshot; enum { UNDEFINED, SNAPSHOT, ONLINE, } state; int2 online_msg_id; int2 snapshot_msg_id; timer_id_t timer_id; }; bool stream_init(struct stream *stream, void (*process)(struct frame *msg), int2 online_msg_id, int2 snapshot_msg_id, unsigned short port, const char source_host[], const char *host[], size_t n_hosts) { size_t i; stream->n_flows = n_hosts; stream->flow = malloc(sizeof(*stream->flow) * n_hosts); sequence_init(&stream->online, process); sequence_init(&stream->snapshot, process); if (snapshot_msg_id != 0) stream->state = UNDEFINED; else stream->state = ONLINE; stream->online_msg_id = online_msg_id; stream->snapshot_msg_id = snapshot_msg_id; stream->timer_id = new_timer(); if (get_addr_info(source_host, &stream->source_sock_addr) != 0) return false; for (i = 0; i < n_hosts; ++i) { struct sockaddr_in sock_addr; if (get_addr_info(host[i], &stream->flow[i].sock_addr) != 0) { while (i--) closesocket(stream->flow[i].sock); return false; } stream->flow[i].sock_addr.sin_port = htons(port); #ifdef _WIN32 // bind to 0.0.0.0: memset(&sock_addr, 0, sizeof(sock_addr)); sock_addr.sin_family = AF_INET; sock_addr.sin_port = htons(port); #else // bind to : memcpy(&sock_addr, &stream->flow[i].sock_addr, sizeof(sock_addr)); #endif stream->flow[i].sock = setup(&sock_addr); if (stream->flow[i].sock == -1) { while (i--) closesocket(stream->flow[i].sock); return false; } if (multicast(stream->flow[i].sock, IP_ADD_SOURCE_MEMBERSHIP, &stream->flow[i].sock_addr.sin_addr, &stream->source_sock_addr.sin_addr) == -1) { do closesocket(stream->flow[i].sock); while (i--); return false; } stream->flow[i].active = true; } return true; } void stream_destroy(struct stream *stream) { size_t i; sequence_destroy(&stream->online); sequence_destroy(&stream->snapshot); for (i = 0; i < stream->n_flows; ++i) { multicast(stream->flow[i].sock, IP_DROP_SOURCE_MEMBERSHIP, &stream->flow[i].sock_addr.sin_addr, &stream->source_sock_addr.sin_addr); closesocket(stream->flow[i].sock); } free(stream->flow); } void stream_online_timeout(struct stream *stream) { size_t i; info("online timed out"); if (stream->snapshot_msg_id != 0) { for (i = 0; i < stream->n_flows; ++i) { // enabling snapshot if (stream->flow[i].active) continue; multicast(stream->flow[i].sock, IP_ADD_SOURCE_MEMBERSHIP, &stream->flow[i].sock_addr.sin_addr, &stream->source_sock_addr.sin_addr); stream->flow[i].active = true; info("Enabling snapshot %s:%hu", inet_ntoa(stream->flow[i].sock_addr.sin_addr), ntohs(stream->flow[i].sock_addr.sin_port)); } stream->state = UNDEFINED; info("New stream state: UNDEFINED"); } else { info("Skipping message: seq=%"PRId64"", stream->online.seq); ++stream->online.seq; sequence_apply_buffer(&stream->online); if (stream->online.buffer_size != 0) set_timer(stream->timer_id, utime() + ONLINE_TIMEOUT, (void (*)(void *))stream_online_timeout, stream); } } bool stream_process(struct stream *stream, int flow_index) { static char buffer[65536]; // because of single thread struct frame *msg = (struct frame *)buffer; int msg_len; struct sockaddr_in sock_addr; socklen_t sock_len = sizeof(sock_addr); char address[INET_ADDRSTRLEN]; if ((msg_len = recvfrom(stream->flow[flow_index].sock, buffer, (int)sizeof(buffer), 0, (struct sockaddr *)&sock_addr, &sock_len)) == -1) { print_error("recv failed"); return false; } if (msg_len != (int)(sizeof(*msg) + msg->size)) { error("malformed data is received"); return false; } strncpy(address, inet_ntoa(stream->flow[flow_index].sock_addr.sin_addr), INET_ADDRSTRLEN); debug("Message from %s:%hu (%s:%hu): msgid=%hu; seq=%"PRId64"", address, ntohs(stream->flow[flow_index].sock_addr.sin_port), inet_ntoa(sock_addr.sin_addr), ntohs(sock_addr.sin_port), msg->msgid, msg->seq); switch (stream->state) { case UNDEFINED: if (msg->msgid == stream->online_msg_id || msg->msgid == HEART_BEAT || msg->msgid == EMPTY_BOOK) sequence_store(&stream->online, msg); else if (msg->msgid == SNAPSHOT_STARTED) { stream->snapshot.seq = msg->seq + 1; stream->state = SNAPSHOT; info("New stream state: SNAPSHOT"); } break; case SNAPSHOT: if (msg->msgid == stream->online_msg_id || msg->msgid == HEART_BEAT || msg->msgid == EMPTY_BOOK) sequence_store(&stream->online, msg); else if (msg->msgid == stream->snapshot_msg_id) sequence_process(&stream->snapshot, msg); else if (msg->msgid == SNAPSHOT_FINISHED) { if (msg->seq == stream->snapshot.seq) { stream->online.seq = ((struct snapshot_finished_msg *)msg)->update_seq + 1; info("Online seq: %"PRId64"", stream->online.seq); sequence_apply_buffer(&stream->online); if (stream->online.buffer_size != 0) set_timer(stream->timer_id, utime() + ONLINE_TIMEOUT, (void (*)(void *))stream_online_timeout, stream); stream->state = ONLINE; info("New stream state: ONLINE"); } else { stream->state = UNDEFINED; info("New stream state: UNDEFINED"); } sequence_reinit(&stream->snapshot); } break; case ONLINE: if (msg->msgid == stream->online_msg_id || msg->msgid == HEART_BEAT || msg->msgid == EMPTY_BOOK) { int8 seq = stream->online.seq; unsigned buffer_size = stream->online.buffer_size; sequence_process(&stream->online, msg); if (stream->online.buffer_size != 0 && (buffer_size == 0 || seq == msg->seq)) set_timer(stream->timer_id, utime() + ONLINE_TIMEOUT, (void (*)(void *))stream_online_timeout, stream); if (stream->online.buffer_size == 0 && buffer_size != 0) drop_timer(stream->timer_id); } else if ((msg->msgid == SNAPSHOT_STARTED || msg->msgid == SNAPSHOT_FINISHED || msg->msgid == stream->snapshot_msg_id) && stream->flow[flow_index].active) { // disabling snapshot multicast(stream->flow[flow_index].sock, IP_DROP_SOURCE_MEMBERSHIP, &stream->flow[flow_index].sock_addr.sin_addr, &stream->source_sock_addr.sin_addr); stream->flow[flow_index].active = false; info("Disabling snapshot %s:%hu", inet_ntoa(stream->flow[flow_index].sock_addr.sin_addr), ntohs(stream->flow[flow_index].sock_addr.sin_port)); } break; } return true; } void order_book_process(struct order_book_msg *msg) { info("Processing order book message (%"PRId64")", msg->frame.seq); } void order_book_spb_process(struct order_book_msg *msg) { info("Processing order book spb message (%"PRId64")", msg->frame.seq); } void commones_process(struct commones_msg *msg) { info("Processing commones message (%"PRId64")", msg->frame.seq); } void best_prices_process(struct best_prices_msg *msg) { info("Processing best prices message (%"PRId64")", msg->frame.seq); } void trades_process(struct trades_msg *msg) { info("Processing trades message (%"PRId64")", msg->frame.seq); } void signal_handler(int signum) { #ifdef _WIN32 if (WSACleanup() == -1) print_error("WSACleanup failed"); #endif } int main() { size_t i, j; #define n_streams 5u struct stream stream[n_streams]; #ifdef _WIN32 WSADATA wsa_data; if (WSAStartup(MAKEWORD(2, 2), &wsa_data) == -1) return print_error("WSAStartup failed"), -1; #endif signal(SIGINT, signal_handler); signal(SIGTERM, signal_handler); stream_init(&stream[0], (void (*)(struct frame *msg))order_book_process, ORDER_BOOK_ONLINE, ORDER_BOOK_SNAPSHOT, ORDER_BOOK_PORT, ORDER_BOOK_SOURCE_HOST, ORDER_BOOK_HOST, ORDER_BOOK_N_HOSTS); stream_init(&stream[1], (void (*)(struct frame *msg))order_book_spb_process, ORDER_BOOK_ONLINE, ORDER_BOOK_SNAPSHOT, ORDER_BOOK_SPB_PORT, ORDER_BOOK_SPB_SOURCE_HOST, ORDER_BOOK_SPB_HOST, ORDER_BOOK_SPB_N_HOSTS); stream_init(&stream[2], (void (*)(struct frame *msg))commones_process, COMMONES_ONLINE, COMMONES_SNAPSHOT, COMMONES_PORT, COMMONES_SOURCE_HOST, COMMONES_HOST, COMMONES_N_HOSTS); stream_init(&stream[3], (void (*)(struct frame *msg))best_prices_process, BEST_PRICES_ONLINE, BEST_PRICES_SNAPSHOT, BEST_PRICES_PORT, BEST_PRICES_SOURCE_HOST, BEST_PRICES_HOST, BEST_PRICES_N_HOSTS); stream_init(&stream[4], (void (*)(struct frame *msg))trades_process, TRADES_ONLINE, 0, TRADES_PORT, TRADES_SOURCE_HOST, TRADES_HOST, TRADES_N_HOSTS); while (1) { int n_fds = 1; fd_set read_set; struct timeval timeout; FD_ZERO(&read_set); for (i = 0; i < n_streams; ++i) { for (j = 0; j < stream[i].n_flows; ++j) { if (n_fds < stream[i].flow[j].sock + 1) n_fds = stream[i].flow[j].sock + 1; FD_SET(stream[i].flow[j].sock, &read_set); } } fflush(stdout); n_fds = select(n_fds, &read_set, NULL, NULL, get_timeout(&timeout)); if (n_fds == -1) break; if (n_fds == 0) // timeout debug("timeout\n"); else { for (i = 0; i < n_streams; ++i) { for (j = 0; j < stream[i].n_flows; ++j) { if (!FD_ISSET(stream[i].flow[j].sock, &read_set)) continue; stream_process(&stream[i], j); } } } process_timers(); } for (i = 0; i < n_streams; ++i) stream_destroy(&stream[i]); return 0; }