1/* 2 * lws-minimal-secure-streams-binance 3 * 4 * Written in 2010-2021 by Andy Green <andy@warmcat.com> 5 * Kutoga <kutoga@user.github.invalid> 6 * 7 * This file is made available under the Creative Commons CC0 1.0 8 * Universal Public Domain Dedication. 9 * 10 * This demonstrates a Secure Streams implementation of a client that connects 11 * to binance ws server efficiently. 12 * 13 * Build lws with -DLWS_WITH_SECURE_STREAMS=1 -DLWS_WITHOUT_EXTENSIONS=0 14 * 15 * "policy.json" contains all the information about endpoints, protocols and 16 * connection validation, tagged by streamtype name. 17 * 18 * The example tries to load it from the cwd, it lives 19 * in ./minimal-examples/secure-streams/minimal-secure-streams-binance dir, so 20 * either run it from there, or copy the policy.json to your cwd. It's also 21 * possible to put the policy json in the code as a string and pass that at 22 * context creation time. 23 */ 24 25#include <libwebsockets.h> 26#include <string.h> 27#include <signal.h> 28#include <ctype.h> 29 30static int interrupted; 31 32typedef struct range { 33 uint64_t sum; 34 uint64_t lowest; 35 uint64_t highest; 36 37 unsigned int samples; 38} range_t; 39 40typedef struct binance { 41 struct lws_ss_handle *ss; 42 void *opaque_data; 43 44 lws_sorted_usec_list_t sul_hz; /* 1hz summary dump */ 45 46 range_t e_lat_range; 47 range_t price_range; 48} binance_t; 49 50/****** Part 1 / 3: application data processing */ 51 52static void 53range_reset(range_t *r) 54{ 55 r->sum = r->highest = 0; 56 r->lowest = 999999999999ull; 57 r->samples = 0; 58} 59 60static uint64_t 61get_us_timeofday(void) 62{ 63 struct timeval tv; 64 65 gettimeofday(&tv, NULL); 66 67 return (uint64_t)((lws_usec_t)tv.tv_sec * LWS_US_PER_SEC) + 68 (uint64_t)tv.tv_usec; 69} 70 71static uint64_t 72pennies(const char *s) 73{ 74 uint64_t price = (uint64_t)atoll(s) * 100; 75 76 s = strchr(s, '.'); 77 78 if (s && isdigit(s[1]) && isdigit(s[2])) 79 price = price + (uint64_t)((10 * (s[1] - '0')) + (s[2] - '0')); 80 81 return price; 82} 83 84static void 85sul_hz_cb(lws_sorted_usec_list_t *sul) 86{ 87 binance_t *bin = lws_container_of(sul, binance_t, sul_hz); 88 89 /* 90 * We are called once a second to dump statistics on the connection 91 */ 92 93 lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz, 94 sul_hz_cb, LWS_US_PER_SEC); 95 96 if (bin->price_range.samples) 97 lwsl_notice("%s: price: min: %llu¢, max: %llu¢, avg: %llu¢, " 98 "(%d prices/s)\n", __func__, 99 (unsigned long long)bin->price_range.lowest, 100 (unsigned long long)bin->price_range.highest, 101 (unsigned long long)(bin->price_range.sum / 102 bin->price_range.samples), 103 bin->price_range.samples); 104 if (bin->e_lat_range.samples) 105 lwsl_notice("%s: elatency: min: %llums, max: %llums, " 106 "avg: %llums, (%d msg/s)\n", __func__, 107 (unsigned long long)bin->e_lat_range.lowest / 1000, 108 (unsigned long long)bin->e_lat_range.highest / 1000, 109 (unsigned long long)(bin->e_lat_range.sum / 110 bin->e_lat_range.samples) / 1000, 111 bin->e_lat_range.samples); 112 113 range_reset(&bin->e_lat_range); 114 range_reset(&bin->price_range); 115} 116 117/****** Part 2 / 3: communication */ 118 119static lws_ss_state_return_t 120binance_rx(void *userobj, const uint8_t *in, size_t len, int flags) 121{ 122 binance_t *bin = (binance_t *)userobj; 123 uint64_t latency_us, now_us; 124 char numbuf[16]; 125 uint64_t price; 126 const char *p; 127 size_t alen; 128 129 now_us = (uint64_t)get_us_timeofday(); 130 131 p = lws_json_simple_find((const char *)in, len, "\"depthUpdate\"", 132 &alen); 133 if (!p) 134 return LWSSSSRET_OK; 135 136 p = lws_json_simple_find((const char *)in, len, "\"E\":", &alen); 137 if (!p) { 138 lwsl_err("%s: no E JSON\n", __func__); 139 return LWSSSSRET_OK; 140 } 141 142 lws_strnncpy(numbuf, p, alen, sizeof(numbuf)); 143 latency_us = now_us - ((uint64_t)atoll(numbuf) * LWS_US_PER_MS); 144 145 if (latency_us < bin->e_lat_range.lowest) 146 bin->e_lat_range.lowest = latency_us; 147 if (latency_us > bin->e_lat_range.highest) 148 bin->e_lat_range.highest = latency_us; 149 150 bin->e_lat_range.sum += latency_us; 151 bin->e_lat_range.samples++; 152 153 p = lws_json_simple_find((const char *)in, len, "\"a\":[[\"", &alen); 154 if (!p) 155 return LWSSSSRET_OK; 156 157 lws_strnncpy(numbuf, p, alen, sizeof(numbuf)); 158 price = pennies(numbuf); 159 160 if (price < bin->price_range.lowest) 161 bin->price_range.lowest = price; 162 if (price > bin->price_range.highest) 163 bin->price_range.highest = price; 164 165 bin->price_range.sum += price; 166 bin->price_range.samples++; 167 168 return LWSSSSRET_OK; 169} 170 171static lws_ss_state_return_t 172binance_state(void *userobj, void *h_src, lws_ss_constate_t state, 173 lws_ss_tx_ordinal_t ack) 174{ 175 binance_t *bin = (binance_t *)userobj; 176 177 lwsl_ss_info(bin->ss, "%s (%d), ord 0x%x", 178 lws_ss_state_name((int)state), state, (unsigned int)ack); 179 180 switch (state) { 181 182 case LWSSSCS_CONNECTED: 183 lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz, 184 sul_hz_cb, LWS_US_PER_SEC); 185 range_reset(&bin->e_lat_range); 186 range_reset(&bin->price_range); 187 188 return LWSSSSRET_OK; 189 190 case LWSSSCS_DISCONNECTED: 191 lws_sul_cancel(&bin->sul_hz); 192 break; 193 194 default: 195 break; 196 } 197 198 return LWSSSSRET_OK; 199} 200 201static const lws_ss_info_t ssi_binance = { 202 .handle_offset = offsetof(binance_t, ss), 203 .opaque_user_data_offset = offsetof(binance_t, opaque_data), 204 .rx = binance_rx, 205 .state = binance_state, 206 .user_alloc = sizeof(binance_t), 207 .streamtype = "binance", /* bind to corresponding policy */ 208}; 209 210/****** Part 3 / 3: init and event loop */ 211 212static const struct lws_extension extensions[] = { 213 { 214 "permessage-deflate", lws_extension_callback_pm_deflate, 215 "permessage-deflate" "; client_no_context_takeover" 216 "; client_max_window_bits" 217 }, 218 { NULL, NULL, NULL /* terminator */ } 219}; 220 221static void 222sigint_handler(int sig) 223{ 224 interrupted = 1; 225} 226 227int main(int argc, const char **argv) 228{ 229 struct lws_context_creation_info info; 230 struct lws_context *cx; 231 int n = 0; 232 233 signal(SIGINT, sigint_handler); 234 235 memset(&info, 0, sizeof info); 236 lws_cmdline_option_handle_builtin(argc, argv, &info); 237 238 lwsl_user("LWS minimal Secure Streams binance client\n"); 239 240 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | 241 LWS_SERVER_OPTION_EXPLICIT_VHOSTS; 242 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ 243 info.fd_limit_per_thread = 1 + 1 + 1; 244 info.extensions = extensions; 245 info.pss_policies_json = "policy.json"; /* literal JSON, or path */ 246 247 cx = lws_create_context(&info); 248 if (!cx) { 249 lwsl_err("lws init failed\n"); 250 return 1; 251 } 252 253 if (lws_ss_create(cx, 0, &ssi_binance, NULL, NULL, NULL, NULL)) { 254 lwsl_cx_err(cx, "failed to create secure stream"); 255 interrupted = 1; 256 } 257 258 while (n >= 0 && !interrupted) 259 n = lws_service(cx, 0); 260 261 lws_context_destroy(cx); 262 263 lwsl_user("Completed\n"); 264 265 return 0; 266} 267