#ifdef _WIN32 #ifndef _WIN32_WINNT #define _WIN32_WINNT 0x0501 #endif #if _WIN32_WINNT < _WIN32_WINNT_WINXP #error "works on win xp of later" #endif #ifdef _MSC_VER #define _CRT_SECURE_NO_WARNINGS #endif #endif #define __STDC_FORMAT_MACROS #include #include #include #include #include #include #include #include #ifdef _MSC_VER #include #else #include #endif #ifdef _WIN32 #include #include #include typedef int socklen_t; #ifdef _MSC_VER #pragma comment (lib, "Ws2_32.lib") #else #pragma message("build with \"-lws2_32\" or \"-lwsock32\"") #endif #define AGAIN WSAEWOULDBLOCK #define WOULDBLOCK WSAEWOULDBLOCK #define INPROGRESS WSAEWOULDBLOCK #define SOL_TCP IPPROTO_TCP #else #include #include #include #include #include #include #include #include #define closesocket close #define AGAIN EAGAIN #define WOULDBLOCK EWOULDBLOCK #define INPROGRESS EINPROGRESS #endif typedef char ascii; typedef int8_t int1; typedef int16_t int2; typedef int32_t int4; typedef int64_t int8; typedef int64_t dec2; typedef int64_t dec8; typedef int32_t time4; typedef uint64_t time8; time8 utime() { #ifdef _WIN32 SYSTEMTIME system_time; FILETIME file_time; ULARGE_INTEGER integer, integer0; GetSystemTime(&system_time); SystemTimeToFileTime(&system_time, &file_time); integer.HighPart = file_time.dwHighDateTime; integer.LowPart = file_time.dwLowDateTime; system_time.wYear = 1970; system_time.wDay = 1; system_time.wMonth = 1; system_time.wHour = 0; system_time.wMinute = 0; system_time.wSecond = 0; system_time.wMilliseconds = 0; SystemTimeToFileTime(&system_time, &file_time); integer0.HighPart = file_time.dwHighDateTime; integer0.LowPart = file_time.dwLowDateTime; return (time8)((integer.QuadPart - integer0.QuadPart) / 1000); #else struct timeval tv; struct timezone tz; gettimeofday(&tv, &tz); return (time8)tv.tv_sec * 1000000 + (time8)tv.tv_usec; #endif } void local_time(int *year, int *month, int *day, int *hour, int *minute, int *second, int *millisecond, int *microsecond) { #ifdef _WIN32 SYSTEMTIME system_time; GetLocalTime(&system_time); if (year != NULL) *year = (int)system_time.wYear; if (month != NULL) *month = (int)system_time.wMonth; if (day != NULL) *day = (int)system_time.wDay; if (hour != NULL) *hour = (int)system_time.wHour; if (minute != NULL) *minute = (int)system_time.wMinute; if (second != NULL) *second = (int)system_time.wSecond; if (millisecond != NULL) *millisecond = system_time.wMilliseconds; if (microsecond != NULL) *microsecond = 0; #else time8 time8; time_t time4; struct tm *tm; time8 = utime(); time4 = (time_t)(time8 / 1000000); tm = localtime(&time4); if (year != NULL) *year = (int)tm->tm_year; if (month != NULL) *month = (int)tm->tm_mon + 1; if (day != NULL) *day = (int)tm->tm_mday; if (hour != NULL) *hour = (int)tm->tm_hour; if (minute != NULL) *minute = (int)tm->tm_min; if (second != NULL) *second = (int)tm->tm_sec; if (millisecond != NULL) *millisecond = (int)(time8 / 1000 % 1000); if (microsecond != NULL) *microsecond = (int)(time8 % 1000); #endif } void _vlog(FILE *file, const char level[], const char format[], va_list args) { int hour, minute, second, millisecond; local_time(NULL, NULL, NULL, &hour, &minute, &second, &millisecond, NULL); fprintf(file, "[%02d:%02d:%02d.%03d] [%s] ", hour, minute, second, millisecond, level); vfprintf(file, format, args); fputc('\n', file); } #ifdef __GNUC__ void _log(FILE *file, const char level[], const char format[], ...) __attribute__((format(printf, 3, 4))); #endif void _log(FILE *file, const char level[], const char format[], ...) { va_list arg_list; va_start(arg_list, format); _vlog(file, level, format, arg_list); va_end(arg_list); } #define debug(format, ...) _log(stdout, "debug", format, ##__VA_ARGS__) #define info(format, ...) _log(stdout, "info", format, ##__VA_ARGS__) #define warning(format, ...) _log(stderr, "warning", format, ##__VA_ARGS__) #define error(format, ...) _log(stderr, "error", format, ##__VA_ARGS__) void print_error(const char message[]) { #ifdef _WIN32 LPSTR buffer = NULL; if (FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, WSAGetLastError(), MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US), (LPSTR)&buffer, 0, NULL) == 0) { error("%s: (%d) unknown error", message, WSAGetLastError()); return; } error("%s: (%d) %s", message, WSAGetLastError(), buffer); LocalFree(buffer); #else error("%s: (%d) %s", message, errno, strerror(errno)); #endif } int get_error() { #ifdef _WIN32 return WSAGetLastError(); #else return errno; #endif } int get_addr_info(const char host[], struct sockaddr_in *sock_addr) { const struct addrinfo hints = { 0, AF_INET, SOCK_DGRAM }; struct addrinfo *addr_info = 0, *ai; int gai_error; gai_error = getaddrinfo(host, 0, &hints, &addr_info); if (gai_error != 0) return error("Failed to resolve address \"%s\": (%d) %s", host, gai_error, gai_strerror(gai_error)), -1; for (ai = addr_info; ai != NULL; ai = ai->ai_next) { if (ai->ai_family != AF_INET) continue; memcpy(sock_addr, ai->ai_addr, ai->ai_addrlen); break; } freeaddrinfo(addr_info); if (ai == NULL) return -1; return 0; } int print_sock_info(int sock) { struct sockaddr_in self_addr, peer_addr; char address[INET_ADDRSTRLEN]; socklen_t self_len = sizeof(self_addr), peer_len = sizeof(peer_addr); if (getsockname(sock, (struct sockaddr *)&self_addr, &self_len) == -1) return print_error("getsockname failed"), -1; if (getpeername(sock, (struct sockaddr *)&peer_addr, &peer_len) == -1) return print_error("getpeername failed"), -1; strcpy(address, inet_ntoa(self_addr.sin_addr)); info("Socket %s:%hu is connected to %s:%hu", address, ntohs(self_addr.sin_port), inet_ntoa(peer_addr.sin_addr), ntohs(peer_addr.sin_port)); return 0; } int setup(struct sockaddr_in *sock_addr) { int sock; int flag = 1; struct linger linger = { 0, 0 }; sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == -1) return print_error("socket failed"), -1; if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&flag, sizeof(flag)) == -1) return print_error("setsockopt failed"), -1; if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (char *)&linger, sizeof(linger)) == -1) return print_error("setsockopt failed"), -1; if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) == -1) return print_error("setsockopt failed"), -1; #ifdef _WIN32 if (ioctlsocket(sock, FIONBIO, (u_long *)&flag) == -1) return print_error("ioctlsocket failed"), -1; #else if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) return print_error("fcntl failed"), -1; #endif if (connect(sock, (struct sockaddr *)sock_addr, sizeof(*sock_addr)) == -1) { if (get_error() != INPROGRESS) return print_error("bind failed"), -1; } return sock; } #pragma pack(push, 1) struct frame { int2 size; int2 msgid; int8 seq; }; struct instrument { int2 market_id; int4 instrument_id; }; struct hello_msg { struct frame frame; ascii login[16]; ascii password[16]; }; #define HELLO 1 struct report_msg { struct frame frame; int2 status; char reason[128]; int2 addresses_offset; int2 addresses_count; struct gateway { int2 type; int1 ver; int1 pad0; char addresses[48]; } gateway[1]; // extendable }; #define REPORT 2 struct resend_request_msg { struct frame frame; int8 from_seq; int8 till_seq; }; #define RESEND_REQUEST 8005 struct resend_report_msg { struct frame frame; int2 status; }; #define RESEND_REPORT 8105 struct login_msg { struct frame frame; ascii login[16]; ascii password[16]; int1 reset_seq; int4 heartbeat_ms; }; #define LOGIN 8001 struct logout_msg { struct frame frame; ascii login[16]; }; #define LOGOUT 8002 struct logon_msg { struct frame frame; int8 last_seq; int8 expected_seq; ascii system_id[8]; }; #define LOGON 8101 struct reject_msg { struct frame frame; int8 ref_seq; int2 ref_msgid; int2 reason; char message[33]; }; #define REJECT 8102 struct heartbeat_msg { struct frame frame; }; #define HEARTBEAT 8103 struct user_header { ascii clorder_id[20]; }; struct gate_header { time8 system_time; ascii clorder_id[20]; int2 source_id; ascii user_id[16]; }; struct account { int4 member_id; // Идентификатор участника торгов ascii account[16]; // Идентификатор торгово-клирингового счета участника торгов ascii client_id[16]; // Идентификатор клиента участника торгов }; struct otccodes { ascii initiator_party[16]; // Идентификатор отправителя адресной заявки ascii ctrparty[16]; // Идентификатор получателя адресной заявки }; struct add_order_msg { struct frame frame; struct user_header user_header; struct instrument instrument; // Компонент идентификации торгового инструмента int1 dir; // Направление поручения. int1 type; // Тип поручения. int1 time_in_force; // Время действия поручения. int1 passive_only; // Зарезервированное поле. Заполняется нулевым байтом int1 auto_cancel; // Режим автоматического снятия при разрыве соединения. int1 pad; // Зарезервированное поле. Заполняется нулевым байтом int2 routing_instruction; // Алгоритма маршрутизации остатка поручения. int2 routing_dest; // Идентификатор торговой площадки исполнения (см. 3.8.1.1.1) int4 amount; // Объем поручения в лотах 40 amount_extra int4 Объем видимой части поручения в лотах. Должно быть заполнено только при type=ICEBERG int4 amount_extra; // Объем видимой части поручения в лотах. Должно быть заполнено только при type=ICEBERG dec8 price; // Цена. Для репо указывается годовая доходность в процентах dec8 price_extra; // Дополнительная цена. Для репо может быть указана цена сделки int8 flags; // Параметры, зависящие от рынка. В текущей версии должно быть заполнено 0x0 time8 time_valid; // Последний срок, когда поручение может быть принято торговой платформой time4 date_expire; // Дата и время автоматического снятия поручения. В текущей версии должно быть заполнено нулем struct account account; struct otccodes parties; char comment[24]; // Клиентский комментарий к поручению ascii extra_ref[12]; // Дополнительный идентификатор заявки ascii extra1[4]; // Дополнительное текстовое поле int2 prime_exchange; // Основной пул ликвидности и пул ликвидности для маршрутизации остатка. (см. 3.8.1.1) int4 match_ref; // Идентификатор для сведения адресного поручения }; #define ADD_ORDER 101 struct reject_report_msg { struct frame frame; struct gate_header gate_header; int2 market; // Торговая площадка отклонившая запрос int2 reason; // Код причины отклонения char message[33]; // Описание причины отклонения int8 extra_data0; // Идентификатор поручения, указан при отклонении запроса на снятие по order_id }; #define REJECT_REPORT 201 struct add_report_msg { struct frame frame; struct gate_header gate_header; struct instrument instrument; // Компонент идентификации торгового инструмента int1 dir; // Направление поручения/заявки int1 type; // Тип поручения/заявки int1 time_in_force; // Время действия поручения int1 passive_only; // Режим постановки int1 auto_cancel; // Режим автоматического снятия при разрыве соединения int1 pad; // Зарезервированное поле, заполняется нулевым байтом int2 routing_instruction; // Алгоритм маршрутизации int2 routing_dest; // Идентификатор торговой площадки int4 amount; // Снимаемый объем поручения/заявки int4 amount_extra; // Объем видимой части поручения/заявки dec8 price; // Цена. Для репо указывается годовая доходность в процентах dec8 price_extra; // Цена сделки. Заполняется только для репо int8 flags; // Параметры, зависящие от рынка (значения см. в 3.9.2.2) time4 date_expire; // Дата и время автоматического снятия поручения time8 time_valid; // Последний срок, когда поручение могло быть принято торговой платформой struct account account; // Компонент идентификации клиента, подавшего заявку struct otccodes parties; // Компонент идентификации сторон адресной заявки int8 order_id; // Идентификатор поручения, присвоенный торговой платформой int8 orig_orderid; // Идентификатор исходного поручения, заполняется при изменении параметров поручения клиентом ascii exch_orderid[20]; // Идентификатор заявки, присвоенный биржей int1 price_entry; // Номер ценового уровня по отношению к лучшему, на который попала заявка int1 pad1; // Зарезервированное поле. Заполняется нулем char comment[24]; // Клиентский комментарий к поручению ascii extra_ref[12]; // Дополнительный идентификатор заявки ascii extra1[4]; // Дополнительное текстовое поле int2 prime_exchange; // Основная торговая площадка int4 match_ref; // Идентификатор для сведения адресного поручения int2 orig_market; // Пул ликвидности, указанный клиентом при подаче }; #define ADD_REPORT 212 #pragma pack(pop) // timer typedef uint16_t timer_id_t; struct timer_list { struct timer_list *next; timer_id_t timer_id; time8 time; int (*handle)(void *data); void *data; }; struct timer_list *timer_list_add(struct timer_list *timer_list, timer_id_t timer_id, time8 time, int (*handle)(void *data), void *data) { if (timer_list == NULL || time < timer_list->time) { struct timer_list *new_timer_list; new_timer_list = malloc(sizeof(*timer_list)); new_timer_list->next = timer_list; new_timer_list->timer_id = timer_id; new_timer_list->time = time; new_timer_list->handle = handle; new_timer_list->data = data; return new_timer_list; } else { timer_list->next = timer_list_add(timer_list->next, timer_id, time, handle, data); return timer_list; } } struct timer_list *timer_list_remove(struct timer_list *timer_list, timer_id_t timer_id) { if (timer_list == NULL) return NULL; else if (timer_list->timer_id == timer_id) { struct timer_list *timer_list_next = timer_list->next; free(timer_list); return timer_list_next; } else { timer_list->next = timer_list_remove(timer_list->next, timer_id); return timer_list; } } struct timer_list * volatile timer_list = NULL; timer_id_t new_timer() { static timer_id_t timer_id = 1; return timer_id++; } void set_timer(timer_id_t timer_id, time8 time, int (*handle)(void *data), void *data) { timer_list = timer_list_remove(timer_list, timer_id); timer_list = timer_list_add(timer_list, timer_id, time, handle, data); } void drop_timer(timer_id_t timer_id) { timer_list = timer_list_remove(timer_list, timer_id); } struct timeval *get_timeout(struct timeval *tv) { time8 time, timeout = 0; if (timer_list == NULL) return NULL; time = utime(); if (time < timer_list->time) timeout = timer_list->time - time; tv->tv_sec = (uint32_t)(timeout / 1000000); tv->tv_usec = (uint32_t)(timeout % 1000000); return tv; } int process_timers() { time8 time; time = utime(); while (timer_list != NULL && timer_list->time <= time) { int (*handle)(void *data); void *data; handle = timer_list->handle; data = timer_list->data; timer_list = timer_list_remove(timer_list, timer_list->timer_id); if (handle(data) == -1) return -1; } return 0; } // storage struct storage_list { struct storage_list *next; struct frame msg; // Extendible }; struct storage_list *storage_list = NULL; struct storage_list *storage_list_add(struct storage_list *storage_list, struct frame *msg) { if (storage_list == NULL || msg->seq < storage_list->msg.seq) { struct storage_list *next = storage_list; storage_list = malloc(sizeof(struct storage_list) + msg->size); storage_list->next = next; memcpy(&storage_list->msg, msg, sizeof(*msg) + msg->size); } else if (msg->seq > storage_list->msg.seq) storage_list->next = storage_list_add(storage_list->next, msg); return storage_list; } struct storage_list *storage_list_pop(struct storage_list *storage_list) { struct storage_list *next = storage_list->next; free(storage_list); return next; } int store_message(struct frame *msg) { storage_list = storage_list_add(storage_list, msg); return 0; } int storage_empty() { return storage_list == NULL; } struct frame *top_message() { return &storage_list->msg; } void pop_message() { storage_list = storage_list_pop(storage_list); } // session struct params { int4 member_id; ascii account[16]; ascii client_id[16]; char host[48]; unsigned short port; ascii login[16]; ascii password[16]; int8 seq; int2 market_id; int4 instrument_id; int2 prime_exchange; dec8 price; }; struct session { int sock; char is_connecting; char *read_buffer; size_t read_buffer_size; size_t read_buffer_capacity; char *write_buffer; size_t write_buffer_offset; size_t write_buffer_size; size_t write_buffer_capacity; struct sockaddr_in sock_addr; int4 member_id; ascii account[16]; ascii client_id[16]; ascii login[16]; ascii password[16]; int2 market_id; int4 instrument_id; int2 prime_exchange; dec8 price; timer_id_t self_idle_timer_id; timer_id_t server_idle_timer_id; timer_id_t application_timer_id; time8 idle_timeout; // microseconds int8 self_seq; int8 next_seq; int8 next_online_seq; char is_restoring; }; int session_send(struct session *session) { while (session->write_buffer_size > 0) { int sent_size; if (session->write_buffer_offset + session->write_buffer_size <= session->write_buffer_capacity) sent_size = send(session->sock, session->write_buffer + session->write_buffer_offset, session->write_buffer_size, 0); else sent_size = send(session->sock, session->write_buffer + session->write_buffer_offset, session->write_buffer_capacity - session->write_buffer_offset, 0); if (sent_size == -1) { if (get_error() == WOULDBLOCK || get_error() == AGAIN) return -1; closesocket(session->sock); session->sock = -1; return print_error("send failed"), -1; } session->write_buffer_offset += sent_size; session->write_buffer_size -= sent_size; if (session->write_buffer_offset == session->write_buffer_capacity) session->write_buffer_offset = 0; debug("Data is sended: %d bytes", sent_size); } return 0; } int session_write(struct session *session, void *msg, size_t size); int session_self_idle_timeout(struct session *session) { struct heartbeat_msg msg; msg.frame.msgid = HEARTBEAT; msg.frame.seq = 0; msg.frame.size = sizeof(msg) - sizeof(msg.frame); if (session_write(session, &msg, sizeof(msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_server_idle_timeout(struct session *session) { error("Server is not responding!"); return -1; } int session_logout(struct session *session) { struct logout_msg logout_msg; if (session->sock == -1) return -1; logout_msg.frame.msgid = LOGOUT; logout_msg.frame.seq = 0; logout_msg.frame.size = sizeof(logout_msg) - sizeof(logout_msg.frame); strncpy(logout_msg.login, session->login, sizeof(logout_msg.login) / sizeof(*logout_msg.login)); if (session_write(session, &logout_msg, sizeof(logout_msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_write(struct session *session, void *buf, size_t size) { struct frame *msg = buf; info("Outgoing message: seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); if (session->write_buffer_size + size > session->write_buffer_capacity) { void *buffer; session->write_buffer_capacity += 65536; buffer = malloc(session->write_buffer_capacity); if (session->write_buffer_offset + session->write_buffer_size <= session->write_buffer_capacity) memcpy(buffer, session->write_buffer + session->write_buffer_offset, session->write_buffer_size); else { memcpy(buffer, session->write_buffer + session->write_buffer_offset, session->write_buffer_capacity - session->write_buffer_offset); memcpy(buffer, session->write_buffer, session->write_buffer_offset + session->write_buffer_size - session->write_buffer_capacity); } free(session->write_buffer); session->write_buffer = buffer; session->write_buffer_offset = 0; } if (session->write_buffer_offset + session->write_buffer_size + size <= session->write_buffer_capacity) memcpy(session->write_buffer + session->write_buffer_offset + session->write_buffer_size, msg, size); else if (session->write_buffer_offset + session->write_buffer_size < session->write_buffer_capacity) { size_t first_part_size = session->write_buffer_capacity - session->write_buffer_offset - session->write_buffer_size; memcpy(session->write_buffer + session->write_buffer_offset + session->write_buffer_size, msg, first_part_size); memcpy(session->write_buffer, (char *)msg + first_part_size, size - first_part_size); } else memcpy(session->write_buffer + session->write_buffer_offset + session->write_buffer_size - session->write_buffer_capacity, msg, size); session->write_buffer_size += size; if (!session->is_connecting) { if (session_send(session) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } } set_timer(session->self_idle_timer_id, utime() + session->idle_timeout, (int (*)(void*))session_self_idle_timeout, session); if (msg->seq != 0) // logout if there is nothing to do any more set_timer(session->application_timer_id, utime() + session->idle_timeout * 3, (int (*)(void *))session_logout, session); return 0; } int session_init(struct session *session, struct params *params) { session->read_buffer_capacity = 65536; session->read_buffer = malloc(session->read_buffer_capacity); session->read_buffer_size = 0; session->write_buffer_capacity = 65536; session->write_buffer = malloc(session->write_buffer_capacity); session->write_buffer_size = 0; session->write_buffer_offset = 0; session->member_id = params->member_id; strncpy(session->account, params->account, sizeof(session->account) / sizeof(*session->account)); strncpy(session->client_id, params->client_id, sizeof(session->client_id) / sizeof(*session->client_id)); strncpy(session->login, params->login, sizeof(session->login) / sizeof(*session->login)); strncpy(session->password, params->password, sizeof(session->password) / sizeof(*session->password)); session->market_id = params->market_id; session->instrument_id = params->instrument_id; session->prime_exchange = params->prime_exchange; session->price = params->price; session->self_idle_timer_id = new_timer(); session->server_idle_timer_id = new_timer(); session->application_timer_id = new_timer(); session->idle_timeout = 1000000; // 1 second session->self_seq = 0; session->next_seq = params->seq; session->next_online_seq = params->seq; session->is_restoring = 0; session->sock = -1; if (get_addr_info(params->host, &session->sock_addr) == -1) return -1; session->sock_addr.sin_port = htons(params->port); return 0; } void session_destroy(struct session *session) { free(session->read_buffer); free(session->write_buffer); } int session_enter(struct session *session) { struct hello_msg msg; session->sock = setup(&session->sock_addr); if (session->sock == -1) return -1; session->is_connecting = 1; msg.frame.msgid = HELLO; msg.frame.seq = 0; msg.frame.size = sizeof(msg) - sizeof(msg.frame); strncpy(msg.login, session->login, sizeof(msg.login) / sizeof(*msg.login)); strncpy(msg.password, session->password, sizeof(msg.password) / sizeof(*msg.password)); if (session_write(session, &msg, sizeof(msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_receive(struct session *session, size_t size) { if (session->read_buffer_capacity < size) { void *buffer; session->read_buffer_capacity = size; buffer = malloc(session->read_buffer_capacity); memcpy(buffer, session->read_buffer, session->read_buffer_size); free(session->read_buffer); session->read_buffer = buffer; } while (session->read_buffer_size < size) { int read_size; read_size = recv(session->sock, session->read_buffer + session->read_buffer_size, size - session->read_buffer_size, 0); if (read_size == -1) { if (get_error() == AGAIN || get_error() == WOULDBLOCK) return -1; closesocket(session->sock); session->sock = -1; return print_error("recv failed"), -1; } if (read_size == 0) { closesocket(session->sock); session->sock = -1; debug("Socket is closed"); return -1; } session->read_buffer_size += read_size; debug("Data is received: %d bytes", read_size); } return 0; } int session_read(struct session *session) { if (session_receive(session, sizeof(struct frame)) == -1) return -1; return session_receive(session, sizeof(struct frame) + ((struct frame *)session->read_buffer)->size); } int session_login(struct session *session) { struct login_msg login_msg; session->sock = setup(&session->sock_addr); if (session->sock == -1) return -1; session->is_connecting = 1; login_msg.frame.msgid = LOGIN; login_msg.frame.seq = 0; login_msg.frame.size = sizeof(login_msg) - sizeof(login_msg.frame); strncpy(login_msg.login, session->login, 16); strncpy(login_msg.password, session->password, 16); login_msg.reset_seq = 0; login_msg.heartbeat_ms = (int4)(session->idle_timeout / 1000); // milliseconds if (session_write(session, &login_msg, sizeof(login_msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_add_order(struct session *session) { struct add_order_msg add_order_msg; memset(&add_order_msg, 0, sizeof(add_order_msg)); add_order_msg.frame.msgid = ADD_ORDER; add_order_msg.frame.seq = session->self_seq++; add_order_msg.frame.size = sizeof(add_order_msg) - sizeof(add_order_msg.frame); strcpy(add_order_msg.user_header.clorder_id, "1"); add_order_msg.instrument.market_id = session->market_id; add_order_msg.instrument.instrument_id = session->instrument_id; add_order_msg.dir = 1; // Buy add_order_msg.type = 2; // Limit add_order_msg.time_in_force = 0; // GTC (Day) add_order_msg.passive_only = 0; add_order_msg.auto_cancel = 0; // Off add_order_msg.pad = 0; add_order_msg.routing_instruction = 0; add_order_msg.routing_dest = session->market_id; add_order_msg.amount = 6; add_order_msg.amount_extra = 0; add_order_msg.price = session->price; add_order_msg.price_extra = 0; add_order_msg.flags = 0; add_order_msg.time_valid = 0; add_order_msg.date_expire = 0; add_order_msg.account.member_id = session->member_id; strncpy(add_order_msg.account.account, session->account, sizeof(add_order_msg.account.account) / sizeof(*add_order_msg.account.account)); strncpy(add_order_msg.account.client_id, session->client_id, sizeof(add_order_msg.account.client_id) / sizeof(*add_order_msg.account.account)); strncpy(add_order_msg.parties.initiator_party, "", sizeof(add_order_msg.parties.initiator_party) / sizeof(*add_order_msg.parties.initiator_party)); strncpy(add_order_msg.parties.ctrparty, "", sizeof(add_order_msg.parties.ctrparty) / sizeof(*add_order_msg.parties.ctrparty)); strncpy(add_order_msg.comment, "Comment 1", sizeof(add_order_msg.comment) / sizeof(*add_order_msg.comment)); strncpy(add_order_msg.extra1, "", sizeof(add_order_msg.extra1) / sizeof(*add_order_msg.extra1)); add_order_msg.prime_exchange = session->prime_exchange; add_order_msg.match_ref = 0; if (session_write(session, &add_order_msg, sizeof(add_order_msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_on_reject(struct session *session, struct reject_msg *reject_msg) { error("Message (seq=%"PRId64", msgid=%hd) was rejected: %hd, %s", reject_msg->ref_seq, reject_msg->ref_msgid, reject_msg->reason, reject_msg->message); if (reject_msg->ref_msgid == LOGIN && reject_msg->reason == 1 && strcmp(reject_msg->message, "WRONG_LOGIN_TIMESTAMP") == 0) { closesocket(session->sock); session->sock = -1; set_timer(session->self_idle_timer_id, utime() + session->idle_timeout / 2, (int (*)(void *))session_login, session); // waiting for login information to be delivered into the gate return 0; } return -1; } int session_on_report(struct session *session, struct report_msg *report_msg) { int i; char host[48]; unsigned short port; info("Logon status: (%hd) %s", report_msg->status, report_msg->reason); if (report_msg->status != 0) return -1; for (i = 0; i < report_msg->addresses_count; ++i) { if (report_msg->gateway[i].type & 0x1) break; } if (i == report_msg->addresses_count) return -1; closesocket(session->sock); session->sock = -1; if (sscanf(report_msg->gateway[i].addresses, "%[^:]:%hu", host, &port) != 2) return -1; if (get_addr_info(host, &session->sock_addr) == -1) return -1; session->sock_addr.sin_port = htons(port); if (session_login(session) == -1) return -1; return 0; } int session_on_logon(struct session *session, struct logon_msg *logon_msg) { session->self_seq = logon_msg->expected_seq; if (session->next_online_seq == 0) session->next_seq = session->next_online_seq = logon_msg->last_seq + 1; info("System id: %.8s", logon_msg->system_id); if (session_add_order(session) == -1) return -1; return 0; } int session_on_reject_report(struct session *session, struct reject_report_msg *reject_report_msg) { warning("Reject report: %hd, %s", reject_report_msg->reason, reject_report_msg->message); return 0; } int session_on_add_report(struct session *session, struct add_report_msg *add_report_msg) { info("Order is accepted: market=%hd, order_id=%"PRId64", orig_orderid=%"PRId64", exch_orderid=%s, price_entry=%d", add_report_msg->instrument.market_id, add_report_msg->order_id, add_report_msg->orig_orderid, add_report_msg->exch_orderid, add_report_msg->price_entry); return 0; } int session_resend(struct session *session, int8 seq) { struct resend_request_msg resend_request_msg; resend_request_msg.frame.msgid = RESEND_REQUEST; resend_request_msg.frame.seq = 0; resend_request_msg.frame.size = sizeof(resend_request_msg) - sizeof(resend_request_msg.frame); resend_request_msg.from_seq = seq; resend_request_msg.till_seq = 0; if (seq > 0) session->next_seq = seq; else session->next_seq = 0; session->is_restoring = 1; if (session_write(session, &resend_request_msg, sizeof(resend_request_msg)) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } int session_on_resend_report(struct session *session, struct resend_report_msg *resend_report_msg) { static char *status[] = { "ACK", "MORE", "FINISH", "DUPLICATE_REQUEST", "UNAVAILABLE" }; info("Resend status: %s", status[resend_report_msg->status]); switch (resend_report_msg->status) { case 0: // ACK info("Resend is started."); break; case 2: // FINISH if (session->next_seq < session->next_online_seq) { case 1: // MORE if (session_resend(session, session->next_seq) == -1) return -1; info("Resend more."); break; } session->is_restoring = 0; info("Resend is finished."); break; case 3: // DUPLICATE_REQUEST warning("Duplicate resend request."); break; case 4: // UNAVAILABLE error("Resend service is unavailable."); return -1; } return 0; } int session_process_message2(struct session *session, struct frame *msg); int session_process_message(struct session *session, struct frame *msg) { if (msg->seq != 0) // logout if there is nothing to do any more set_timer(session->application_timer_id, utime() + session->idle_timeout * 3, (int (*)(void *))session_logout, session); if (msg->seq != 0) { if (session->next_seq == 0 || msg->seq == session->next_seq) { if (session->next_online_seq == 0 || msg->seq == session->next_online_seq) { info("Incoming message (ONLINE): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); session->next_online_seq = msg->seq + 1; } else info("Incoming message (SNAPSHOT): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); session->next_seq = msg->seq + 1; } else if (msg->seq > session->next_seq) { info("Incoming message (STORED): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); if (msg->seq >= session->next_online_seq) session->next_online_seq = msg->seq + 1; if (store_message(msg) == -1) return -1; if (!session->is_restoring) { if (session_resend(session, session->next_seq) == -1) return -1; } return 0; } else { info("Incoming message (SKIPPED): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); return 0; } } if (session_process_message2(session, msg) == -1) return -1; while (!storage_empty()) { struct frame *msg = top_message(); if (msg->seq != session->next_seq) break; info("Incoming message (RESTORED): seq=%"PRId64", msgid=%hd", msg->seq, msg->msgid); if (session_process_message2(session, msg) == -1) return -1; session->next_seq = msg->seq + 1; pop_message(); } return 0; } int session_process_message2(struct session *session, struct frame *msg) { switch (msg->msgid) { case HEARTBEAT: // do nothing break; case REJECT: if (session_on_reject(session, (struct reject_msg *)msg) == -1) return -1; break; case REPORT: // получить адрес шлюза и переоткрыть сокет if (session_on_report(session, (struct report_msg *)msg) == -1) return -1; break; case LOGON: // we are logged in if (session_on_logon(session, (struct logon_msg *)msg) == -1) return -1; break; case REJECT_REPORT: if (session_on_reject_report(session, (struct reject_report_msg *)msg) == -1) return -1; break; case ADD_REPORT: if (session_on_add_report(session, (struct add_report_msg *)msg) == -1) return -1; break; case RESEND_REPORT: if (session_on_resend_report(session, (struct resend_report_msg *)msg) == -1) return -1; } return 0; } int session_on_read(struct session *session) { if (session_read(session) == -1) { if (get_error() == AGAIN || get_error() == WOULDBLOCK) return 0; return -1; } set_timer(session->server_idle_timer_id, utime() + session->idle_timeout * 4, (int (*)(void*))session_server_idle_timeout, session); if (session_process_message(session, (struct frame *)session->read_buffer) == -1) return -1; session->read_buffer_size = 0; return 0; } int session_on_error(struct session *session) { return error("Connection is refused"), -1; } int session_on_write(struct session *session) { if (session->is_connecting) { #ifndef _WIN32 int flag; socklen_t len = sizeof(flag); if (getsockopt(session->sock, SOL_SOCKET, SO_ERROR, &flag, &len) == -1) return print_error("getsockopt failed"), -1; if (flag != 0) return error("Connection is refused"), -1; #endif session->is_connecting = 0; print_sock_info(session->sock); } if (session_send(session) == -1) { if (get_error() != AGAIN && get_error() != WOULDBLOCK) return -1; } return 0; } struct option { char *key; int (*option_handler)(const char *, void *, size_t); void *value; size_t size; void (*print_default)(FILE *file, void *, size_t); }; int getint(const char *arg, void *value, size_t size) { char dummy; char *format[] = {0, 0, "%hd%c", 0, "%d%c", 0, 0, 0, "%"PRId64"%c"}; if (sscanf(arg, format[size], value, &dummy) != 1) return -1; return 0; } int getstr(const char *arg, void *value, size_t size) { if (strlen(arg) > size) return -1; strncpy((char *)value, arg, size); return 0; } int getoptions(int argc, char *argv[], struct option options[], size_t n_options) { int i; size_t oid; char *key; int64_t mask = 0, required = 0; for (oid = 0; oid < n_options; ++oid) { if (!options[oid].print_default) required |= 1 << oid; } for (i = 1; i < argc; ++i) { if (argv[i][0] != '-') break; for (oid = 0; oid < n_options; ++oid) { for (key = options[oid].key; *key != '\0'; key += strlen(key) + 1) { if (strcmp(argv[i], key) != 0) continue; ++i; mask |= 1 << oid; if (i >= argc || options[oid].option_handler(argv[i], options[oid].value, options[oid].size) != 0) return error("Inappropriate argument for \"%s\": %s", options[oid].key, argv[i]), -1; break; } if (*key != '\0') break; } if (oid == n_options) return error("Unknown argument \"%s\"", argv[i]), -1; } if ((mask & required) != required) return error("Some arguments are missing"), -1; return 0; } void printint(FILE *file, void *data, size_t size) { if (size == sizeof(int16_t)) fprintf(file, "%hd", *(int16_t *)data); else if (size == sizeof(int32_t)) fprintf(file, "%d", *(int32_t *)data); else if (size == sizeof(int64_t)) fprintf(file, "%"PRId64, *(int64_t *)data); } void print_options(FILE *file, struct option options[], size_t n_options) { size_t oid; char *key; for (oid = 0; oid < n_options; ++oid) { fputc('\t', file); for (key = options[oid].key; *key != '\0'; key += strlen(key) + 1) { fprintf(file, "%s ", key); } fputc('\t', file); if (!options[oid].print_default) fprintf(file, "(required)"); else { fprintf(file, "(default: "); options[oid].print_default(file, options[oid].value, options[oid].size); fprintf(file, ")"); } fputc('\n', file); } } void sig_handler(int signum) { #ifdef _WIN32 if (WSACleanup() == -1) print_error("WSACleanup failed"); #endif } int main(int argc, char *argv[]) { struct params params; struct session session; #ifdef _WIN32 WSADATA wsa_data; #endif struct option options[] = { {"-m\0--member-id\0", getint, ¶ms.member_id, sizeof(params.member_id), NULL}, {"-a\0--account\0", getstr, ¶ms.account, sizeof(params.account), NULL}, {"-c\0--client-id\0", getstr, ¶ms.client_id, sizeof(params.client_id), NULL}, {"-h\0--host\0", getstr, ¶ms.host, sizeof(params.host), NULL}, {"-p\0--port\0", getint, ¶ms.port, sizeof(params.port), NULL}, {"-l\0--login\0", getstr, ¶ms.login, sizeof(params.login), NULL}, {"-w\0--password\0", getstr, ¶ms.password, sizeof(params.password), NULL}, {"-s\0--seq\0", getint, ¶ms.seq, sizeof(params.seq), printint}, {"-i\0--instrument-id\0", getint, ¶ms.instrument_id, sizeof(params.instrument_id), printint}, {"--mi\0--market-id\0", getint, ¶ms.market_id, sizeof(params.market_id), printint}, {"--pe\0--prime-exchange\0", getint, ¶ms.prime_exchange, sizeof(params.prime_exchange), printint}, {"--price\0", getint, ¶ms.price, sizeof(params.price), printint}, }; size_t n_options = sizeof(options) / sizeof(*options); memset(¶ms, 0, sizeof(params)); memset(&session, 0, sizeof(session)); params.instrument_id = 754; params.market_id = 1001; params.prime_exchange = 0; params.price = 13500000000LL; if (getoptions(argc, argv, options, n_options) == -1) { fprintf(stderr, "%s [options]\n", argv[0]); print_options(stderr, options, n_options); return -1; } #ifdef _WIN32 if (WSAStartup(MAKEWORD(2, 2), &wsa_data) == -1) return print_error("WSAStartup failed"), -1; #endif signal(SIGINT, sig_handler); signal(SIGTERM, sig_handler); if (session_init(&session, ¶ms) == -1) return -1; if (session_enter(&session) == -1) return -1; while (1) { int n_fds = 1; fd_set read_set, write_set, error_set; struct timeval timeval, *timeout; timeout = get_timeout(&timeval); fflush(stdout); FD_ZERO(&read_set); FD_ZERO(&write_set); FD_ZERO(&error_set); if (session.sock == -1 && timeout == NULL) { break; // there is nether socket nor timers to wait #ifdef _WIN32 } else if (session.sock == -1 && timeout != NULL) { Sleep((DWORD)timeout->tv_sec * 1000 + (DWORD)timeout->tv_usec / 1000); n_fds = 0; #endif } else { if (session.sock != -1) { FD_SET(session.sock, &read_set); if (session.write_buffer_size != 0 || session.is_connecting) // check for write only if it needs to FD_SET(session.sock, &write_set); if (session.is_connecting) FD_SET(session.sock, &error_set); n_fds = session.sock + 1; } n_fds = select(n_fds, &read_set, &write_set, &error_set, timeout); } if (n_fds == -1) { print_error("select failed"); break; } else if (n_fds == 0) debug("timeout"); else { if (session.is_connecting) if (FD_ISSET(session.sock, &error_set)) if (session_on_error(&session) == -1) break; if (session.write_buffer_size != 0 || session.is_connecting) if (FD_ISSET(session.sock, &write_set)) if (session_on_write(&session) == -1) break; if (FD_ISSET(session.sock, &read_set)) if (session_on_read(&session) == -1) break; } if (process_timers() == -1) break; } session_destroy(&session); return 0; }