1/*
2 * lws-minimal-secure-streams-binance
3 *
4 * Written in 2010-2021 by Andy Green <andy@warmcat.com>
5 *                         Kutoga <kutoga@user.github.invalid>
6 *
7 * This file is made available under the Creative Commons CC0 1.0
8 * Universal Public Domain Dedication.
9 *
10 * This demonstrates a Secure Streams implementation of a client that connects
11 * to binance ws server efficiently.
12 *
13 * Build lws with -DLWS_WITH_SECURE_STREAMS=1 -DLWS_WITHOUT_EXTENSIONS=0
14 *
15 * "policy.json" contains all the information about endpoints, protocols and
16 * connection validation, tagged by streamtype name.
17 *
18 * The example tries to load it from the cwd, it lives
19 * in ./minimal-examples/secure-streams/minimal-secure-streams-binance dir, so
20 * either run it from there, or copy the policy.json to your cwd.  It's also
21 * possible to put the policy json in the code as a string and pass that at
22 * context creation time.
23 */
24
25#include <libwebsockets.h>
26#include <string.h>
27#include <signal.h>
28#include <ctype.h>
29
30static int interrupted;
31
32typedef struct range {
33	uint64_t		sum;
34	uint64_t		lowest;
35	uint64_t		highest;
36
37	unsigned int		samples;
38} range_t;
39
40typedef struct binance {
41	struct lws_ss_handle 	*ss;
42	void			*opaque_data;
43
44	lws_sorted_usec_list_t	sul_hz;	     /* 1hz summary dump */
45
46	range_t			e_lat_range;
47	range_t			price_range;
48} binance_t;
49
50/****** Part 1 / 3: application data processing */
51
52static void
53range_reset(range_t *r)
54{
55	r->sum = r->highest = 0;
56	r->lowest = 999999999999ull;
57	r->samples = 0;
58}
59
60static uint64_t
61get_us_timeofday(void)
62{
63	struct timeval tv;
64
65	gettimeofday(&tv, NULL);
66
67	return (uint64_t)((lws_usec_t)tv.tv_sec * LWS_US_PER_SEC) +
68			  (uint64_t)tv.tv_usec;
69}
70
71static uint64_t
72pennies(const char *s)
73{
74	uint64_t price = (uint64_t)atoll(s) * 100;
75
76	s = strchr(s, '.');
77
78	if (s && isdigit(s[1]) && isdigit(s[2]))
79		price = price + (uint64_t)((10 * (s[1] - '0')) + (s[2] - '0'));
80
81	return price;
82}
83
84static void
85sul_hz_cb(lws_sorted_usec_list_t *sul)
86{
87	binance_t *bin = lws_container_of(sul, binance_t, sul_hz);
88
89	/*
90	 * We are called once a second to dump statistics on the connection
91	 */
92
93	lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz,
94			 sul_hz_cb, LWS_US_PER_SEC);
95
96	if (bin->price_range.samples)
97		lwsl_notice("%s: price: min: %llu¢, max: %llu¢, avg: %llu¢, "
98			    "(%d prices/s)\n", __func__,
99			    (unsigned long long)bin->price_range.lowest,
100			    (unsigned long long)bin->price_range.highest,
101			    (unsigned long long)(bin->price_range.sum /
102						    bin->price_range.samples),
103			    bin->price_range.samples);
104	if (bin->e_lat_range.samples)
105		lwsl_notice("%s: elatency: min: %llums, max: %llums, "
106			    "avg: %llums, (%d msg/s)\n", __func__,
107			    (unsigned long long)bin->e_lat_range.lowest / 1000,
108			    (unsigned long long)bin->e_lat_range.highest / 1000,
109			    (unsigned long long)(bin->e_lat_range.sum /
110					   bin->e_lat_range.samples) / 1000,
111			    bin->e_lat_range.samples);
112
113	range_reset(&bin->e_lat_range);
114	range_reset(&bin->price_range);
115}
116
117/****** Part 2 / 3: communication */
118
119static lws_ss_state_return_t
120binance_rx(void *userobj, const uint8_t *in, size_t len, int flags)
121{
122	binance_t *bin = (binance_t *)userobj;
123	uint64_t latency_us, now_us;
124	char numbuf[16];
125	uint64_t price;
126	const char *p;
127	size_t alen;
128
129	now_us = (uint64_t)get_us_timeofday();
130
131	p = lws_json_simple_find((const char *)in, len, "\"depthUpdate\"",
132				 &alen);
133	if (!p)
134		return LWSSSSRET_OK;
135
136	p = lws_json_simple_find((const char *)in, len, "\"E\":", &alen);
137	if (!p) {
138		lwsl_err("%s: no E JSON\n", __func__);
139		return LWSSSSRET_OK;
140	}
141
142	lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
143	latency_us = now_us - ((uint64_t)atoll(numbuf) * LWS_US_PER_MS);
144
145	if (latency_us < bin->e_lat_range.lowest)
146		bin->e_lat_range.lowest = latency_us;
147	if (latency_us > bin->e_lat_range.highest)
148		bin->e_lat_range.highest = latency_us;
149
150	bin->e_lat_range.sum += latency_us;
151	bin->e_lat_range.samples++;
152
153	p = lws_json_simple_find((const char *)in, len, "\"a\":[[\"", &alen);
154	if (!p)
155		return LWSSSSRET_OK;
156
157	lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
158	price = pennies(numbuf);
159
160	if (price < bin->price_range.lowest)
161		bin->price_range.lowest = price;
162	if (price > bin->price_range.highest)
163		bin->price_range.highest = price;
164
165	bin->price_range.sum += price;
166	bin->price_range.samples++;
167
168	return LWSSSSRET_OK;
169}
170
171static lws_ss_state_return_t
172binance_state(void *userobj, void *h_src, lws_ss_constate_t state,
173	      lws_ss_tx_ordinal_t ack)
174{
175	binance_t *bin = (binance_t *)userobj;
176
177	lwsl_ss_info(bin->ss, "%s (%d), ord 0x%x",
178		     lws_ss_state_name((int)state), state, (unsigned int)ack);
179
180	switch (state) {
181
182	case LWSSSCS_CONNECTED:
183		lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz,
184				 sul_hz_cb, LWS_US_PER_SEC);
185		range_reset(&bin->e_lat_range);
186		range_reset(&bin->price_range);
187
188		return LWSSSSRET_OK;
189
190	case LWSSSCS_DISCONNECTED:
191		lws_sul_cancel(&bin->sul_hz);
192		break;
193
194	default:
195		break;
196	}
197
198	return LWSSSSRET_OK;
199}
200
201static const lws_ss_info_t ssi_binance = {
202	.handle_offset		  = offsetof(binance_t, ss),
203	.opaque_user_data_offset  = offsetof(binance_t, opaque_data),
204	.rx			  = binance_rx,
205	.state			  = binance_state,
206	.user_alloc		  = sizeof(binance_t),
207	.streamtype		  = "binance", /* bind to corresponding policy */
208};
209
210/****** Part 3 / 3: init and event loop */
211
212static const struct lws_extension extensions[] = {
213	{
214		"permessage-deflate", lws_extension_callback_pm_deflate,
215		"permessage-deflate" "; client_no_context_takeover"
216		 "; client_max_window_bits"
217	},
218	{ NULL, NULL, NULL /* terminator */ }
219};
220
221static void
222sigint_handler(int sig)
223{
224	interrupted = 1;
225}
226
227int main(int argc, const char **argv)
228{
229	struct lws_context_creation_info info;
230	struct lws_context *cx;
231	int n = 0;
232
233	signal(SIGINT, sigint_handler);
234
235	memset(&info, 0, sizeof info);
236	lws_cmdline_option_handle_builtin(argc, argv, &info);
237
238	lwsl_user("LWS minimal Secure Streams binance client\n");
239
240	info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
241		       LWS_SERVER_OPTION_EXPLICIT_VHOSTS;
242	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
243	info.fd_limit_per_thread = 1 + 1 + 1;
244	info.extensions = extensions;
245	info.pss_policies_json = "policy.json"; /* literal JSON, or path */
246
247	cx = lws_create_context(&info);
248	if (!cx) {
249		lwsl_err("lws init failed\n");
250		return 1;
251	}
252
253	if (lws_ss_create(cx, 0, &ssi_binance, NULL, NULL, NULL, NULL)) {
254		lwsl_cx_err(cx, "failed to create secure stream");
255		interrupted = 1;
256	}
257
258	while (n >= 0 && !interrupted)
259		n = lws_service(cx, 0);
260
261	lws_context_destroy(cx);
262
263	lwsl_user("Completed\n");
264
265	return 0;
266}
267