1/* 2 * lws-minimal-secure-streams-avs 3 * 4 * Written in 2019-2020 by Andy Green <andy@warmcat.com> 5 * 6 * This file is made available under the Creative Commons CC0 1.0 7 * Universal Public Domain Dedication. 8 * 9 * This sends a canned WAV and received (and discards) the mp3 response. 10 * However it rate-limits the response reception to manage a small ringbuffer 11 * using ss / h2 flow control apis, reflecting consumption at 64kbps and only 12 * and 8KB buffer, indtended to model optimizing rx buffering on mp3 playback 13 * on a constrained device. 14 */ 15 16#include <libwebsockets.h> 17#include <string.h> 18#include <sys/types.h> 19#include <sys/stat.h> 20#if !defined(WIN32) 21#include <unistd.h> 22#endif 23#include <assert.h> 24#include <fcntl.h> 25 26extern int interrupted, bad; 27static struct lws_ss_handle *hss_avs_event, *hss_avs_sync; 28static uint8_t *wav; 29static size_t wav_len; 30 31typedef struct ss_avs_event { 32 struct lws_ss_handle *ss; 33 void *opaque_data; 34 /* ... application specific state ... */ 35 struct lejp_ctx jctx; 36} ss_avs_event_t; 37 38typedef struct ss_avs_metadata { 39 struct lws_ss_handle *ss; 40 void *opaque_data; 41 /* ... application specific state ... */ 42 struct lejp_ctx jctx; 43 size_t pos; 44 45 /* 46 * We simulate a ringbuffer that is used up by a sul at 64Kbit/sec 47 * rate, and managed at the same rate using tx credit 48 */ 49 50 lws_sorted_usec_list_t sul; 51 uint8_t buf[256 * 1024]; /* to test rate-limiting, set to 8 * 1024 */ 52 int head; 53 int tail; 54 55 char filled; 56 57} ss_avs_metadata_t; 58 59static const char *metadata = "{" 60 "\"event\": {" 61 "\"header\": {" 62 "\"namespace\": \"SpeechRecognizer\"," 63 "\"name\": \"Recognize\"," 64 "\"messageId\": \"message-123\"," 65 "\"dialogRequestId\": \"dialog-request-321\"" 66 "}," 67 "\"payload\": {" 68 "\"profile\":" "\"CLOSE_TALK\"," 69 "\"format\":" "\"AUDIO_L16_RATE_16000_CHANNELS_1\"" 70 "}" 71 "}" 72"}"; 73 74/* 75 * avs metadata 76 */ 77 78static void 79use_buffer_50ms(lws_sorted_usec_list_t *sul) 80{ 81 ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul); 82 struct lws_context *context = (struct lws_context *)m->opaque_data; 83 size_t n; 84 int e; 85 86 /* 87 * Use up 50ms-worth (8KB / 20) == 401 bytes of buffered data 88 */ 89 90 /* remaining data in buffer */ 91 n = ((size_t)(m->head - m->tail) % sizeof(m->buf)); 92 lwsl_info("%s: avail %d\n", __func__, (int)n); 93 94 if (n < 401) 95 lwsl_err("%s: underrun\n", __func__); 96 97 m->tail = ((size_t)m->tail + 401) % sizeof(m->buf); 98 n = ((size_t)(m->head - m->tail) % sizeof(m->buf)); 99 100 e = lws_ss_get_est_peer_tx_credit(m->ss); 101 102 lwsl_info("%s: avail after: %d, curr est %d\n", __func__, (int)n, e); 103 104 if (n < (sizeof(m->buf) * 2) / 3 && e < (int)(sizeof(m->buf) - 1 - n)) { 105 lwsl_info("%s: requesting additional %d\n", __func__, 106 (int)sizeof(m->buf) - 1 - e - (int)n); 107 lws_ss_add_peer_tx_credit(m->ss, (int32_t)((int)sizeof(m->buf) - 1 - e - (int)n)); 108 } 109 110 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms, 111 50 * LWS_US_PER_MS); 112} 113 114static lws_ss_state_return_t 115ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags) 116{ 117 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj; 118 struct lws_context *context = (struct lws_context *)m->opaque_data; 119 size_t n, n1; 120 121 lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__, 122 lws_ss_rideshare(m->ss), (int)len, flags); 123#if 0 124 lwsl_hexdump_warn(buf, len); 125#endif 126 127 n = sizeof(m->buf) - ((size_t)(m->head - m->tail) % sizeof(m->buf)); 128 lwsl_info("%s: len %d, buf h %d, t %d, space %d\n", __func__, 129 (int)len, (int)m->head, (int)m->tail, (int)n); 130 lws_ss_get_est_peer_tx_credit(m->ss); 131 if (len > n) { 132 lwsl_err("%s: bad len: len %d, n %d\n", __func__, (int)len, (int)n); 133 assert(0); 134 135 return 1; 136 } 137 138 if (m->head < m->tail) /* |****h-------t**| */ 139 memcpy(&m->buf[m->head], buf, len); 140 else { /* |---t*****h-----| */ 141 n1 = sizeof(m->buf) - (size_t)m->head; 142 if (len < n1) 143 n1 = len; 144 memcpy(&m->buf[m->head], buf, n1); 145 if (n1 != len) 146 memcpy(m->buf, buf, len - n1); 147 } 148 149 m->head = (((size_t)m->head) + len) % sizeof(m->buf); 150 151 lws_sul_schedule(context, 0, &m->sul, use_buffer_50ms, 152 50 * LWS_US_PER_MS); 153 154 return 0; 155} 156 157static lws_ss_state_return_t 158ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, 159 size_t *len, int *flags) 160{ 161 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj; 162 //struct lws_context *context = (struct lws_context *)m->opaque_data; 163 size_t tot; 164 165 if ((long)m->pos < 0) { 166 *len = 0; 167 lwsl_debug("%s: skip tx\n", __func__); 168 return 1; 169 } 170 171// lwsl_notice("%s: rideshare '%s'\n", __func__, lws_ss_rideshare(m->ss)); 172 173 if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) { 174 /* audio rideshare */ 175 176 if (!m->pos) 177 *flags |= LWSSS_FLAG_SOM; 178 179 if (*len > wav_len - m->pos) 180 *len = wav_len - m->pos; 181 182 memcpy(buf, wav + m->pos, *len); 183 m->pos += *len; 184 185 if (m->pos == wav_len) { 186 *flags |= LWSSS_FLAG_EOM; 187 lwsl_info("%s: tx done\n", __func__); 188 m->pos = (size_t)-1l; /* ban subsequent until new stream */ 189 } else 190 return lws_ss_request_tx(m->ss); 191 192 lwsl_hexdump_info(buf, *len); 193 194 return 0; 195 } 196 197 /* metadata part */ 198 199 tot = strlen(metadata); 200 201 if (!m->pos) 202 *flags |= LWSSS_FLAG_SOM; 203 204 if (*len > tot - m->pos) 205 *len = tot - m->pos; 206 207 memcpy(buf, metadata + m->pos, *len); 208 209 m->pos += *len; 210 211 if (m->pos == tot) { 212 *flags |= LWSSS_FLAG_EOM; 213 m->pos = 0; /* for next time */ 214 return lws_ss_request_tx(m->ss); 215 } 216 217 lwsl_hexdump_info(buf, *len); 218 219 return 0; 220} 221 222static lws_ss_state_return_t 223ss_avs_metadata_state(void *userobj, void *sh, 224 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack) 225{ 226 227 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj; 228 // struct lws_context *context = (struct lws_context *)m->opaque_data; 229 230 lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state), 231 (unsigned int)ack); 232 233 switch (state) { 234 case LWSSSCS_CREATING: 235 lwsl_user("%s: CREATING\n", __func__); 236 m->pos = 0; 237 return lws_ss_client_connect(m->ss); 238 239 case LWSSSCS_CONNECTING: 240 break; 241 case LWSSSCS_CONNECTED: 242 return lws_ss_request_tx(m->ss); 243 244 case LWSSSCS_ALL_RETRIES_FAILED: 245 /* for this demo app, we want to exit on fail to connect */ 246 case LWSSSCS_DISCONNECTED: 247 /* for this demo app, we want to exit after complete flow */ 248 lws_sul_cancel(&m->sul); 249 interrupted = 1; 250 break; 251 case LWSSSCS_DESTROYING: 252 lws_sul_cancel(&m->sul); 253 break; 254 default: 255 break; 256 } 257 258 return 0; 259} 260 261/* 262 * avs event 263 */ 264 265static lws_ss_state_return_t 266ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags) 267{ 268#if !defined(LWS_WITH_NO_LOGS) 269 ss_avs_event_t *m = (ss_avs_event_t *)userobj; 270 // struct lws_context *context = (struct lws_context *)m->opaque_data; 271 272 lwsl_notice("%s: rideshare %s, len %d, flags 0x%x\n", __func__, 273 lws_ss_rideshare(m->ss), (int)len, flags); 274#endif 275// lwsl_hexdump_warn(buf, len); 276 277 bad = 0; /* for this demo, receiving something here == success */ 278 279 return 0; 280} 281 282static lws_ss_state_return_t 283ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, 284 size_t *len, int *flags) 285{ 286#if !defined(LWS_WITH_NO_LOGS) 287 ss_avs_event_t *m = (ss_avs_event_t *)userobj; 288 lwsl_notice("%s: rideshare %s\n", __func__, lws_ss_rideshare(m->ss)); 289#endif 290 return 1; /* don't transmit anything */ 291} 292 293static lws_ss_state_return_t 294ss_avs_event_state(void *userobj, void *sh, 295 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack) 296{ 297 ss_avs_event_t *m = (ss_avs_event_t *)userobj; 298 struct lws_context *context = (struct lws_context *)m->opaque_data; 299 lws_ss_info_t ssi; 300 301 lwsl_user("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state), 302 (unsigned int)ack); 303 304 switch (state) { 305 case LWSSSCS_CREATING: 306 case LWSSSCS_CONNECTING: 307 break; 308 case LWSSSCS_CONNECTED: 309 if (hss_avs_sync) 310 break; 311 312 lwsl_notice("%s: starting the second avs stream\n", __func__); 313 314 /* 315 * When we have established the event stream, we must POST 316 * on another stream within 10s 317 */ 318 319 memset(&ssi, 0, sizeof(ssi)); 320 ssi.handle_offset = offsetof(ss_avs_metadata_t, ss); 321 ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t, 322 opaque_data); 323 ssi.rx = ss_avs_metadata_rx; 324 ssi.tx = ss_avs_metadata_tx; 325 ssi.state = ss_avs_metadata_state; 326 ssi.user_alloc = sizeof(ss_avs_metadata_t); 327 ssi.streamtype = "avs_metadata"; 328 329 /* 330 * We want to allow the other side to fill our buffer, but no 331 * more. But it's a bit tricky when the payload is inside 332 * framing like multipart MIME and contains other parts 333 */ 334 335 /* uncomment to test rate-limiting, doesn't work with AVS servers */ 336// ssi.manual_initial_tx_credit = 337// sizeof(((ss_avs_metadata_t *)0)->buf) / 2; 338 339 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync, 340 NULL, NULL)) { 341 lwsl_err("%s: failed to create avs metadata secstream\n", 342 __func__); 343 } 344 break; 345 case LWSSSCS_ALL_RETRIES_FAILED: 346 /* for this demo app, we want to exit on fail to connect */ 347 interrupted = 1; 348 break; 349 case LWSSSCS_DISCONNECTED: 350 break; 351 case LWSSSCS_DESTROYING: 352 lwsl_notice("%s: DESTROYING\n", __func__); 353 if (wav) { 354 free(wav); 355 wav = NULL; 356 } 357 break; 358 default: 359 break; 360 } 361 362 return 0; 363} 364 365int 366avs_example_start(struct lws_context *context) 367{ 368 lws_ss_info_t ssi; 369 struct stat stat; 370 int fd; 371 372 if (hss_avs_event) 373 return 0; 374 375 fd = open("./year.wav", O_RDONLY); 376 if (fd < 0) { 377 lwsl_err("%s: failed to open wav file\n", __func__); 378 379 return 1; 380 } 381 if (fstat(fd, &stat) < 0) { 382 lwsl_err("%s: failed to stat wav file\n", __func__); 383 384 goto bail; 385 } 386 387 wav_len = (size_t)stat.st_size; 388 wav = malloc(wav_len); 389 if (!wav) { 390 lwsl_err("%s: failed to alloc wav buffer", __func__); 391 392 goto bail; 393 } 394 if (read(fd, wav, 395#if defined(WIN32) 396 (unsigned int) 397#endif 398 wav_len) != (int)wav_len) { 399 lwsl_err("%s: failed to read wav\n", __func__); 400 401 goto bail; 402 } 403 close(fd); 404 405 lwsl_user("%s: Starting AVS stream\n", __func__); 406 407 /* AVS wants us to establish the long poll event stream first */ 408 409 memset(&ssi, 0, sizeof(ssi)); 410 ssi.handle_offset = offsetof(ss_avs_event_t, ss); 411 ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data); 412 ssi.rx = ss_avs_event_rx; 413 ssi.tx = ss_avs_event_tx; 414 ssi.state = ss_avs_event_state; 415 ssi.user_alloc = sizeof(ss_avs_event_t); 416 ssi.streamtype = "avs_event"; 417 418 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) { 419 lwsl_err("%s: failed to create avs event secure stream\n", 420 __func__); 421 free(wav); 422 wav = NULL; 423 return 1; 424 } 425 426 return 0; 427 428bail: 429 close(fd); 430 431 return 1; 432} 433