1/*
2 * lws-minimal-secure-streams-avs
3 *
4 * Written in 2019-2020 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This sends a canned WAV and received (and discards) the mp3 response.
10 * However it rate-limits the response reception to manage a small ringbuffer
11 * using ss / h2 flow control apis, reflecting consumption at 64kbps and only
12 * and 8KB buffer, indtended to model optimizing rx buffering on mp3 playback
13 * on a constrained device.
14 */
15
16#include <libwebsockets.h>
17#include <string.h>
18#include <sys/types.h>
19#include <sys/stat.h>
20#if !defined(WIN32)
21#include <unistd.h>
22#endif
23#include <assert.h>
24#include <fcntl.h>
25
26extern int interrupted, bad;
27static struct lws_ss_handle *hss_avs_event, *hss_avs_sync;
28static uint8_t *wav;
29static size_t wav_len;
30
31typedef struct ss_avs_event {
32	struct lws_ss_handle 	*ss;
33	void			*opaque_data;
34	/* ... application specific state ... */
35	struct lejp_ctx		jctx;
36} ss_avs_event_t;
37
38typedef struct ss_avs_metadata {
39	struct lws_ss_handle 	*ss;
40	void			*opaque_data;
41	/* ... application specific state ... */
42	struct lejp_ctx		jctx;
43	size_t			pos;
44
45	/*
46	 * We simulate a ringbuffer that is used up by a sul at 64Kbit/sec
47	 * rate, and managed at the same rate using tx credit
48	 */
49
50	lws_sorted_usec_list_t	sul;
51	uint8_t			buf[256 * 1024]; /* to test rate-limiting, set to 8 * 1024 */
52	int			head;
53	int			tail;
54
55	char			filled;
56
57} ss_avs_metadata_t;
58
59static const char *metadata = "{"
60	"\"event\": {"
61		"\"header\": {"
62			"\"namespace\": \"SpeechRecognizer\","
63			"\"name\": \"Recognize\","
64			"\"messageId\": \"message-123\","
65			"\"dialogRequestId\": \"dialog-request-321\""
66		"},"
67		"\"payload\": {"
68			"\"profile\":"	"\"CLOSE_TALK\","
69			"\"format\":"	"\"AUDIO_L16_RATE_16000_CHANNELS_1\""
70		"}"
71	"}"
72"}";
73
74/*
75 * avs metadata
76 */
77
78static void
79use_buffer_50ms(lws_sorted_usec_list_t *sul)
80{
81	ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul);
82	struct lws_context *context = (struct lws_context *)m->opaque_data;
83	size_t n;
84	int e;
85
86	/*
87	 * Use up 50ms-worth (8KB / 20) == 401 bytes of buffered data
88	 */
89
90	/* remaining data in buffer */
91	n = ((size_t)(m->head - m->tail) % sizeof(m->buf));
92	lwsl_info("%s: avail %d\n", __func__, (int)n);
93
94	if (n < 401)
95		lwsl_err("%s: underrun\n", __func__);
96
97	m->tail = ((size_t)m->tail + 401) % sizeof(m->buf);
98	n = ((size_t)(m->head - m->tail) % sizeof(m->buf));
99
100	e = lws_ss_get_est_peer_tx_credit(m->ss);
101
102	lwsl_info("%s: avail after: %d, curr est %d\n", __func__, (int)n, e);
103
104	if (n < (sizeof(m->buf) * 2) / 3 && e < (int)(sizeof(m->buf) - 1 - n)) {
105		lwsl_info("%s: requesting additional %d\n", __func__,
106				(int)sizeof(m->buf) - 1 - e - (int)n);
107		lws_ss_add_peer_tx_credit(m->ss, (int32_t)((int)sizeof(m->buf) - 1 - e - (int)n));
108	}
109
110	lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
111			 50 * LWS_US_PER_MS);
112}
113
114static lws_ss_state_return_t
115ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
116{
117	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
118	struct lws_context *context = (struct lws_context *)m->opaque_data;
119	size_t n, n1;
120
121	lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
122			lws_ss_rideshare(m->ss), (int)len, flags);
123#if 0
124	lwsl_hexdump_warn(buf, len);
125#endif
126
127	n = sizeof(m->buf) - ((size_t)(m->head - m->tail) % sizeof(m->buf));
128	lwsl_info("%s: len %d, buf h %d, t %d, space %d\n", __func__,
129		    (int)len, (int)m->head, (int)m->tail, (int)n);
130	lws_ss_get_est_peer_tx_credit(m->ss);
131	if (len > n) {
132		lwsl_err("%s: bad len: len %d, n %d\n", __func__, (int)len, (int)n);
133		assert(0);
134
135		return 1;
136	}
137
138	if (m->head < m->tail)				/* |****h-------t**| */
139		memcpy(&m->buf[m->head], buf, len);
140	else {						/* |---t*****h-----| */
141		n1 = sizeof(m->buf) - (size_t)m->head;
142		if (len < n1)
143			n1 = len;
144		memcpy(&m->buf[m->head], buf, n1);
145		if (n1 != len)
146			memcpy(m->buf, buf, len - n1);
147	}
148
149	m->head = (((size_t)m->head) + len) % sizeof(m->buf);
150
151	lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms,
152			 50 * LWS_US_PER_MS);
153
154	return 0;
155}
156
157static lws_ss_state_return_t
158ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
159		   size_t *len, int *flags)
160{
161	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
162	//struct lws_context *context = (struct lws_context *)m->opaque_data;
163	size_t tot;
164
165	if ((long)m->pos < 0) {
166		*len = 0;
167		lwsl_debug("%s: skip tx\n", __func__);
168		return 1;
169	}
170
171//	lwsl_notice("%s: rideshare '%s'\n", __func__, lws_ss_rideshare(m->ss));
172
173	if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) {
174		/* audio rideshare */
175
176		if (!m->pos)
177			*flags |= LWSSS_FLAG_SOM;
178
179		if (*len > wav_len - m->pos)
180			*len = wav_len - m->pos;
181
182		memcpy(buf, wav + m->pos, *len);
183		m->pos += *len;
184
185		if (m->pos == wav_len) {
186			*flags |= LWSSS_FLAG_EOM;
187			lwsl_info("%s: tx done\n", __func__);
188			m->pos = (size_t)-1l; /* ban subsequent until new stream */
189		} else
190			return lws_ss_request_tx(m->ss);
191
192		lwsl_hexdump_info(buf, *len);
193
194		return 0;
195	}
196
197	/* metadata part */
198
199	tot = strlen(metadata);
200
201	if (!m->pos)
202		*flags |= LWSSS_FLAG_SOM;
203
204	if (*len > tot - m->pos)
205		*len = tot - m->pos;
206
207	memcpy(buf, metadata + m->pos, *len);
208
209	m->pos += *len;
210
211	if (m->pos == tot) {
212		*flags |= LWSSS_FLAG_EOM;
213		m->pos = 0; /* for next time */
214		return lws_ss_request_tx(m->ss);
215	}
216
217	lwsl_hexdump_info(buf, *len);
218
219	return 0;
220}
221
222static lws_ss_state_return_t
223ss_avs_metadata_state(void *userobj, void *sh,
224		      lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
225{
226
227	ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj;
228	// struct lws_context *context = (struct lws_context *)m->opaque_data;
229
230	lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state),
231		  (unsigned int)ack);
232
233	switch (state) {
234	case LWSSSCS_CREATING:
235		lwsl_user("%s: CREATING\n", __func__);
236		m->pos = 0;
237		return lws_ss_client_connect(m->ss);
238
239	case LWSSSCS_CONNECTING:
240		break;
241	case LWSSSCS_CONNECTED:
242		return lws_ss_request_tx(m->ss);
243
244	case LWSSSCS_ALL_RETRIES_FAILED:
245		/* for this demo app, we want to exit on fail to connect */
246	case LWSSSCS_DISCONNECTED:
247		/* for this demo app, we want to exit after complete flow */
248		lws_sul_cancel(&m->sul);
249		interrupted = 1;
250		break;
251	case LWSSSCS_DESTROYING:
252		lws_sul_cancel(&m->sul);
253		break;
254	default:
255		break;
256	}
257
258	return 0;
259}
260
261/*
262 * avs event
263 */
264
265static lws_ss_state_return_t
266ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
267{
268#if !defined(LWS_WITH_NO_LOGS)
269	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
270	// struct lws_context *context = (struct lws_context *)m->opaque_data;
271
272	lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__,
273			lws_ss_rideshare(m->ss), (int)len, flags);
274#endif
275//	lwsl_hexdump_warn(buf, len);
276
277	bad = 0; /* for this demo, receiving something here == success */
278
279	return 0;
280}
281
282static lws_ss_state_return_t
283ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
284		      size_t *len, int *flags)
285{
286#if !defined(LWS_WITH_NO_LOGS)
287	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
288	lwsl_notice("%s: rideshare %s\n", __func__, lws_ss_rideshare(m->ss));
289#endif
290	return 1; /* don't transmit anything */
291}
292
293static lws_ss_state_return_t
294ss_avs_event_state(void *userobj, void *sh,
295		   lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
296{
297	ss_avs_event_t *m = (ss_avs_event_t *)userobj;
298	struct lws_context *context = (struct lws_context *)m->opaque_data;
299	lws_ss_info_t ssi;
300
301	lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state),
302		  (unsigned int)ack);
303
304	switch (state) {
305	case LWSSSCS_CREATING:
306	case LWSSSCS_CONNECTING:
307		break;
308	case LWSSSCS_CONNECTED:
309		if (hss_avs_sync)
310			break;
311
312		lwsl_notice("%s: starting the second avs stream\n", __func__);
313
314		/*
315		 * When we have established the event stream, we must POST
316		 * on another stream within 10s
317		 */
318
319		memset(&ssi, 0, sizeof(ssi));
320		ssi.handle_offset	    = offsetof(ss_avs_metadata_t, ss);
321		ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t,
322						       opaque_data);
323		ssi.rx			    = ss_avs_metadata_rx;
324		ssi.tx			    = ss_avs_metadata_tx;
325		ssi.state		    = ss_avs_metadata_state;
326		ssi.user_alloc		    = sizeof(ss_avs_metadata_t);
327		ssi.streamtype		    = "avs_metadata";
328
329		/*
330		 * We want to allow the other side to fill our buffer, but no
331		 * more.  But it's a bit tricky when the payload is inside
332		 * framing like multipart MIME and contains other parts
333		 */
334
335		/* uncomment to test rate-limiting, doesn't work with AVS servers */
336//		ssi.manual_initial_tx_credit =
337//				sizeof(((ss_avs_metadata_t *)0)->buf) / 2;
338
339		if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync,
340				  NULL, NULL)) {
341			lwsl_err("%s: failed to create avs metadata secstream\n",
342				 __func__);
343		}
344		break;
345	case LWSSSCS_ALL_RETRIES_FAILED:
346		/* for this demo app, we want to exit on fail to connect */
347		interrupted = 1;
348		break;
349	case LWSSSCS_DISCONNECTED:
350		break;
351	case LWSSSCS_DESTROYING:
352		lwsl_notice("%s: DESTROYING\n", __func__);
353		if (wav) {
354			free(wav);
355			wav = NULL;
356		}
357		break;
358	default:
359		break;
360	}
361
362	return 0;
363}
364
365int
366avs_example_start(struct lws_context *context)
367{
368	lws_ss_info_t ssi;
369	struct stat stat;
370	int fd;
371
372	if (hss_avs_event)
373		return 0;
374
375	fd = open("./year.wav", O_RDONLY);
376	if (fd < 0) {
377		lwsl_err("%s: failed to open wav file\n", __func__);
378
379		return 1;
380	}
381	if (fstat(fd, &stat) < 0) {
382		lwsl_err("%s: failed to stat wav file\n", __func__);
383
384		goto bail;
385	}
386
387	wav_len = (size_t)stat.st_size;
388	wav = malloc(wav_len);
389	if (!wav) {
390		lwsl_err("%s: failed to alloc wav buffer", __func__);
391
392		goto bail;
393	}
394	if (read(fd, wav,
395#if defined(WIN32)
396		(unsigned int)
397#endif
398			wav_len) != (int)wav_len) {
399		lwsl_err("%s: failed to read wav\n", __func__);
400
401		goto bail;
402	}
403	close(fd);
404
405	lwsl_user("%s: Starting AVS stream\n", __func__);
406
407	/* AVS wants us to establish the long poll event stream first */
408
409	memset(&ssi, 0, sizeof(ssi));
410	ssi.handle_offset	    = offsetof(ss_avs_event_t, ss);
411	ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data);
412	ssi.rx			    = ss_avs_event_rx;
413	ssi.tx			    = ss_avs_event_tx;
414	ssi.state		    = ss_avs_event_state;
415	ssi.user_alloc		    = sizeof(ss_avs_event_t);
416	ssi.streamtype		    = "avs_event";
417
418	if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) {
419		lwsl_err("%s: failed to create avs event secure stream\n",
420			 __func__);
421		free(wav);
422		wav = NULL;
423		return 1;
424	}
425
426	return 0;
427
428bail:
429	close(fd);
430
431	return 1;
432}
433