1/* 2 * lws-minimal-secure-streams-alexa 3 * 4 * This file is made available under the Creative Commons CC0 1.0 5 * Universal Public Domain Dedication. 6 */ 7 8#include <libwebsockets.h> 9#include <string.h> 10#include <sys/types.h> 11#include <sys/stat.h> 12#include <unistd.h> 13#include <fcntl.h> 14 15#include <mpg123.h> 16 17#include "private.h" 18 19struct lws_ss_handle *hss_avs_event, *hss_avs_sync; 20 21/* this is the type for the long poll event channel */ 22 23typedef struct ss_avs_event { 24 struct lws_ss_handle *ss; 25 void *opaque_data; 26 /* ... application specific state ... */ 27 28 struct lejp_ctx jctx; 29} ss_avs_event_t; 30 31enum { 32 LAMP3STATE_IDLE, 33 LAMP3STATE_SPOOLING, 34 LAMP3STATE_DRAINING, 35}; 36 37/* this is the type for the utterance metadata (and audio rideshares) */ 38 39typedef struct ss_avs_metadata { 40 struct lws_ss_handle *ss; 41 void *opaque_data; 42 /* ... application specific state ... */ 43 44 struct lws_buflist *dribble; /* next mp3 data while draining last */ 45 46 struct lejp_ctx jctx; 47 size_t pos; 48 size_t mp3_in; 49 mpg123_handle *mh; 50 51 lws_sorted_usec_list_t sul; 52 53 uint8_t stash_eom[16]; 54 55 uint8_t se_head; 56 uint8_t se_tail; 57 58 char mp3_state; 59 char first_mp3; 60 uint8_t mp3_mime_match; 61 uint8_t seen; 62 uint8_t inside_mp3; 63 64} ss_avs_metadata_t; 65 66/* 67 * The remote server only seems to give us a budget of 10s to consume the 68 * results, after that it doesn't drop the stream, but doesn't send us anything 69 * further on it. 70 * 71 * This makes it impossible to optimize buffering for incoming mp3 since we 72 * have to go ahead and take it before the 10s is up. 73 */ 74 75#define MAX_MP3_IN_BUFFERING_BYTES 32768 76 77/* 78 * Structure of JSON metadata for utterance handling 79 */ 80 81static const char *metadata = "{" 82 "\"event\": {" 83 "\"header\": {" 84 "\"namespace\": \"SpeechRecognizer\"," 85 "\"name\": \"Recognize\"," 86 "\"messageId\": \"message-123\"," 87 "\"dialogRequestId\": \"dialog-request-321\"" 88 "}," 89 "\"payload\": {" 90 "\"profile\":" "\"CLOSE_TALK\"," 91 "\"format\":" "\"AUDIO_L16_RATE_16000_CHANNELS_1\"" 92 "}" 93 "}" 94"}"; 95 96/* 97 * avs metadata 98 */ 99 100static void 101use_buffer_250ms(lws_sorted_usec_list_t *sul) 102{ 103 ss_avs_metadata_t *m = lws_container_of(sul, ss_avs_metadata_t, sul); 104 struct lws_context *context = (struct lws_context *)m->opaque_data; 105 int est = lws_ss_get_est_peer_tx_credit(m->ss); 106 107 lwsl_notice("%s: est txcr %d\n", __func__, est); 108 109 if (est < MAX_MP3_IN_BUFFERING_BYTES - (MAX_MP3_IN_BUFFERING_BYTES / 4)) { 110 lwsl_notice(" adding %d\n", MAX_MP3_IN_BUFFERING_BYTES / 4); 111 lws_ss_add_peer_tx_credit(m->ss, MAX_MP3_IN_BUFFERING_BYTES / 4); 112 } 113 114 lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms, 115 250 * LWS_US_PER_MS); 116} 117 118static const char *mp3_mimetype = "application/octet-stream", 119 *match2 = "\x0d\x0a\x0d\x0a"; 120 121static int 122ss_avs_mp3_open(ss_avs_metadata_t *m) 123{ 124 int r; 125 126 lwsl_notice("%s\n", __func__); 127 128 m->first_mp3 = 1; 129 m->mh = mpg123_new(NULL, NULL); 130 if (!m->mh) { 131 lwsl_err("%s: unable to make new mp3\n", 132 __func__); 133 goto bail; 134 } 135 mpg123_format_none(m->mh); 136 r = mpg123_format(m->mh, 16000, MPG123_M_MONO, 137 MPG123_ENC_SIGNED_16); 138 if (r) { 139 lwsl_err("%s: mpg123 format failed %d\n", 140 __func__, r); 141 goto bail1; 142 } 143 r = mpg123_open_feed(m->mh); 144 if (r) { 145 lwsl_err("%s: mpg123 open feed failed %d\n", 146 __func__, r); 147 goto bail1; 148 } 149 150 return 0; 151 152bail1: 153 mpg123_delete(m->mh); 154 m->mh = NULL; 155 156bail: 157 return 1; 158} 159 160static lws_ss_state_return_t 161ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags); 162 163/* 164 * This is called when the mp3 has drained it's input buffer and destroyed 165 * itself. 166 */ 167 168static int 169drain_end_cb(void *v) 170{ 171 ss_avs_metadata_t *m = (ss_avs_metadata_t *)v; 172 struct lws_context *context = (struct lws_context *)m->opaque_data; 173 int tot = 0; 174 175 lwsl_err("%s\n", __func__); 176 177 /* 178 * We have drained and destroyed the existing mp3 session. Is there 179 * a new one pending? 180 */ 181 182 m->first_mp3 = 1; 183 m->mp3_state = LAMP3STATE_IDLE; 184 185 if (lws_buflist_total_len(&m->dribble)) { 186 /* we started another one */ 187 188 /* resume tx credit top up */ 189 lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms, 1); 190 191 if (ss_avs_mp3_open(m)) 192 return 1; 193 194 m->mp3_state = LAMP3STATE_SPOOLING; 195 196 /* 197 * Dump what we stashed from draining into the new mp3 198 */ 199 200 while (lws_buflist_total_len(&m->dribble)) { 201 size_t s; 202 uint8_t *u, t; 203 204 s = lws_buflist_next_segment_len(&m->dribble, &u); 205 t = m->stash_eom[m->se_tail]; 206 lwsl_notice("%s: preload %d: %d\n", __func__, (int)s, t); 207 208 mpg123_feed(m->mh, u, s); 209 lws_buflist_use_segment(&m->dribble, s); 210 if (m->first_mp3) { 211 play_mp3(m->mh, NULL, NULL); 212 m->first_mp3 = 0; 213 } 214 215 tot += s; 216 217 m->se_tail = (m->se_tail + 1) % sizeof(m->stash_eom); 218 if (t) { 219 lwsl_notice("%s: preloaded EOM\n", __func__); 220 221 /* 222 * We stashed the whole of the message, we need 223 * to also do the EOM processing. We will come 224 * back here if there's another message in the 225 * stash. 226 */ 227 228 m->mp3_state = LAMP3STATE_DRAINING; 229 if (m->mh) 230 play_mp3(NULL, drain_end_cb, m); 231 232 lws_ss_add_peer_tx_credit(m->ss, tot); 233#if 0 234 /* 235 * Put a hold on bringing in any more data 236 */ 237 lws_sul_cancel(&m->sul); 238#endif 239 /* destroy our copy of the handle */ 240 m->mh = NULL; 241 242 break; 243 } 244 } 245 246 lws_ss_add_peer_tx_credit(m->ss, tot); 247 } 248 249 return 0; 250} 251 252static lws_ss_state_return_t 253ss_avs_metadata_rx(void *userobj, const uint8_t *buf, size_t len, int flags) 254{ 255 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj; 256 struct lws_context *context = (struct lws_context *)m->opaque_data; 257 int n = 0, hit = 0; 258 259 lwsl_notice("%s: len %d, flags %d (est peer txcr %d)\n", __func__, 260 (int)len, flags, lws_ss_get_est_peer_tx_credit(m->ss)); 261 262 // lwsl_hexdump_warn(buf, len); 263 264 if ((flags & LWSSS_FLAG_SOM) && !m->mh && !m->seen) { 265 m->mp3_mime_match = 0; 266 m->seen = 0; 267 m->inside_mp3 = 0; 268 } 269 270 if (!m->inside_mp3) { 271 /* 272 * Identify the part with the mp3 in, if any 273 */ 274 275 while (n < (int)len - 24) { 276 if (!m->seen) { 277 if (buf[n] == mp3_mimetype[m->mp3_mime_match]) { 278 m->mp3_mime_match++; 279 if (m->mp3_mime_match == 24) { 280 m->mp3_mime_match = 0; 281 m->seen = 1; 282 n++; 283 continue; 284 } 285 } else 286 m->mp3_mime_match = 0; 287 } else { 288 if (buf[n] == match2[m->mp3_mime_match]) { 289 m->mp3_mime_match++; 290 if (m->mp3_mime_match == 4) { 291 m->seen = 0; 292 m->mp3_mime_match = 0; 293 hit = 1; 294 n++; 295 buf += n; 296 len -= n; 297 lwsl_notice("identified reply...\n"); 298 m->inside_mp3 = 1; 299 break; 300 } 301 } else 302 m->mp3_mime_match = 0; 303 } 304 305 n++; 306 } 307 308 if (!hit) { 309 lws_ss_add_peer_tx_credit(m->ss, len); 310 return 0; 311 } 312 } 313 314 // lwsl_notice("%s: state %d\n", __func__, m->mp3_state); 315 316 switch (m->mp3_state) { 317 case LAMP3STATE_IDLE: 318 319 if (hit) { 320 321 lws_ss_add_peer_tx_credit(m->ss, n); 322 323 if (ss_avs_mp3_open(m)) 324 goto bail; 325 326 lws_sul_schedule(context, 0, &m->sul, use_buffer_250ms, 1); 327 m->mp3_state = LAMP3STATE_SPOOLING; 328 break; 329 } 330 331 lws_ss_add_peer_tx_credit(m->ss, len); 332 333 if (!m->inside_mp3) 334 break; 335 336 /* fallthru */ 337 338 case LAMP3STATE_SPOOLING: 339 340 if (m->dribble) 341 goto draining; 342 343 if (len) { 344 /* 345 * We are shoving encoded mp3 into mpg123-allocated heap 346 * buffers... unfortunately mpg123 doesn't seem to 347 * expose where it is in its allocated input so we can 348 * track how much is stashed. Instead while in playback 349 * mode, we assume 64kbps mp3 encoding, ie, 8KB/s, and 350 * run a sul that allows an additional 2KB tx credit 351 * every 250ms, with 4KB initial credit. 352 */ 353 lwsl_notice("%s: SPOOL %d\n", __func__, (int)len); 354 mpg123_feed(m->mh, buf, len); 355 356 if (m->first_mp3) { 357 lws_sul_schedule(context, 0, &m->sul, 358 use_buffer_250ms, 1); 359 // lws_ss_add_peer_tx_credit(m->ss, 360 // len + (MAX_MP3_IN_BUFFERING_BYTES / 2)); 361 play_mp3(m->mh, NULL, NULL); 362 } //else 363 // lws_ss_add_peer_tx_credit(m->ss, len); 364 m->first_mp3 = 0; 365 } 366 367 if (flags & LWSSS_FLAG_EOM) { 368 /* 369 * This means one "message" / mime part with mp3 data 370 * has finished coming in. But there may be whole other 371 * parts with other mp3s following, with potentially 372 * different mp3 parameters. So we want to tell this 373 * one to drain and finish and destroy the current mp3 374 * object before we go on. 375 * 376 * But not knowing the length of the current one, there 377 * will already be outstanding tx credit at the server, 378 * so it's going to spam us with the next part before we 379 * have the new mp3 sink for it. 380 */ 381 lwsl_notice("%s: EOM\n", __func__); 382 m->mp3_mime_match = 0; 383 m->seen = 0; 384 m->mp3_state = LAMP3STATE_DRAINING; 385 /* from input POV, we're no longer inside an mp3 */ 386 m->inside_mp3 = 0; 387 if (m->mh) 388 play_mp3(NULL, drain_end_cb, m); 389#if 0 390 /* 391 * Put a hold on bringing in any more data 392 */ 393 lws_sul_cancel(&m->sul); 394#endif 395 /* destroy our copy of the handle */ 396 m->mh = NULL; 397 } 398 break; 399 400 case LAMP3STATE_DRAINING: 401 402draining: 403 if (buf && len && m->inside_mp3) { 404 lwsl_notice("%s: DRAINING: stashing %d: %d %d %d\n", 405 __func__, (int)len, !!(flags & LWSSS_FLAG_EOM), 406 m->se_head, m->se_tail); 407 lwsl_hexdump_notice(buf, len); 408 if (lws_buflist_append_segment(&m->dribble, buf, len) < 0) 409 goto bail; 410 411 m->stash_eom[m->se_head] = !!(flags & LWSSS_FLAG_EOM); 412 m->se_head = (m->se_head + 1) % sizeof(m->stash_eom); 413 lwsl_notice("%s: next head %d\n", __func__, m->se_head); 414 415 lws_ss_add_peer_tx_credit(m->ss, len); 416 } 417 418 if (flags & LWSSS_FLAG_EOM) { 419 if (!len && m->se_head != m->se_tail) { 420 /* 0-len EOM... retrospectively mark last stash */ 421 lwsl_notice("%s: retro EOM\n", __func__); 422 m->stash_eom[(m->se_head - 1) % sizeof(m->stash_eom)] = 1; 423 } 424 425 lwsl_notice("%s: Draining EOM\n", __func__); 426 m->inside_mp3 = 0; 427 } 428 /* 429 * Don't provide any additional tx credit... we're just 430 * mopping up the overspill from the previous mp3 credit 431 */ 432 break; 433 } 434 435 return 0; 436 437bail: 438 return -1; 439} 440 441/* 442 * Because this is multipart mime in h2 currently, use a "rideshare" to handle 443 * first the native metadata on this secure stream, then the "rideshare" audio 444 * stream mentioned in the policy. 445 * 446 * Lws takes care of interleaving the multipart mime pieces since the policy 447 * calls for it. 448 */ 449 450static lws_ss_state_return_t 451ss_avs_metadata_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, 452 size_t *len, int *flags) 453{ 454 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj; 455 size_t tot; 456 int n; 457 458 // lwsl_notice("%s %d\n", __func__, (int)m->pos); 459 460 if ((long)m->pos < 0) { 461 *len = 0; 462 lwsl_info("%s: skip\n", __func__); 463 return 1; 464 } 465 466 if (!strcmp(lws_ss_rideshare(m->ss), "avs_audio")) { 467 468 /* audio rideshare part */ 469 470 if (!m->pos) 471 *flags |= LWSSS_FLAG_SOM; 472 473 n = spool_capture(buf, *len); 474 if (n > 0) 475 *len = n; 476 else 477 *len = 0; 478 if (!n) { 479 lwsl_info("%s: trying to skip tx\n", __func__); 480 return 1; 481 } 482 483 m->pos += *len; 484 485 if (n < 0) { 486 *flags |= LWSSS_FLAG_EOM; 487 m->pos = (long)-1l; /* ban subsequent until new stream */ 488 } 489 490 lwsl_notice("%s: tx audio %d\n", __func__, (int)*len); 491 492#if 0 493 { 494 int ff = open("/tmp/z1", O_RDWR | O_CREAT | O_APPEND, 0666); 495 if (ff == -1) 496 lwsl_err("%s: errno %d\n", __func__, errno); 497 write(ff, buf, *len); 498 close(ff); 499 } 500#endif 501 502 return 0; 503 } 504 505 /* metadata part */ 506 507 tot = strlen(metadata); 508 509 if (!m->pos) 510 *flags |= LWSSS_FLAG_SOM; 511 512 if (*len > tot - m->pos) 513 *len = tot - m->pos; 514 515 memcpy(buf, metadata + m->pos, *len); 516 517 m->pos += *len; 518 519 if (m->pos == tot) { 520 lwsl_notice("metadata done\n"); 521 *flags |= LWSSS_FLAG_EOM; 522 m->pos = 0; /* for next time */ 523 } 524 525 return 0; 526} 527 528static lws_ss_state_return_t 529ss_avs_metadata_state(void *userobj, void *sh, 530 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack) 531{ 532 ss_avs_metadata_t *m = (ss_avs_metadata_t *)userobj; 533 struct lws_context *context = (struct lws_context *)m->opaque_data; 534 535 lwsl_notice("%s: %p: %s, ord 0x%x\n", __func__, m->ss, 536 lws_ss_state_name(state), (unsigned int)ack); 537 538 switch (state) { 539 case LWSSSCS_CREATING: 540 return lws_ss_client_connect(m->ss); 541 542 case LWSSSCS_CONNECTING: 543 m->pos = 0; 544 break; 545 case LWSSSCS_CONNECTED: 546 lwsl_info("%s: CONNECTED\n", __func__); 547 return lws_ss_request_tx(m->ss); 548 549 case LWSSSCS_DISCONNECTED: 550 lws_sul_cancel(&m->sul); 551 //if (m->mh) { 552 play_mp3(NULL, NULL, NULL); 553 m->mh = NULL; 554 //} 555 /* 556 * For this stream encapsulating an alexa exchange, dropping 557 * is the end of its life 558 */ 559 return 1; 560 561 case LWSSSCS_DESTROYING: 562 lws_buflist_destroy_all_segments(&m->dribble); 563 break; 564 default: 565 break; 566 } 567 568 return 0; 569} 570 571/* 572 * avs event 573 */ 574 575static lws_ss_state_return_t 576ss_avs_event_rx(void *userobj, const uint8_t *buf, size_t len, int flags) 577{ 578 return 0; 579} 580 581static lws_ss_state_return_t 582ss_avs_event_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, 583 size_t *len, int *flags) 584{ 585 return 1; /* don't transmit anything */ 586} 587 588static lws_ss_state_return_t 589ss_avs_event_state(void *userobj, void *sh, 590 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack) 591{ 592 lwsl_info("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name(state), 593 (unsigned int)ack); 594 595 switch (state) { 596 case LWSSSCS_CREATING: 597 mpg123_init(); 598 break; 599 case LWSSSCS_CONNECTING: 600 break; 601 case LWSSSCS_CONNECTED: 602 lwsl_user("Connected to Alexa... speak \"Alexa, ...\"\n"); 603 break; 604 case LWSSSCS_DISCONNECTED: 605 lwsl_user("Disconnected from Alexa\n"); 606 break; 607 case LWSSSCS_DESTROYING: 608 mpg123_exit(); 609 break; 610 default: 611 break; 612 } 613 614 return 0; 615} 616 617int 618avs_query_start(struct lws_context *context) 619{ 620 lws_ss_info_t ssi; 621 622 lwsl_notice("%s:\n", __func__); 623 624 memset(&ssi, 0, sizeof(ssi)); 625 ssi.handle_offset = offsetof(ss_avs_metadata_t, ss); 626 ssi.opaque_user_data_offset = offsetof(ss_avs_metadata_t, opaque_data); 627 ssi.rx = ss_avs_metadata_rx; 628 ssi.tx = ss_avs_metadata_tx; 629 ssi.state = ss_avs_metadata_state; 630 ssi.user_alloc = sizeof(ss_avs_metadata_t); 631 ssi.streamtype = "avs_metadata"; 632 633 ssi.manual_initial_tx_credit = 8192; 634 635 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_sync, NULL, NULL)) { 636 lwsl_err("%s: failed to create avs metadata secstream\n", 637 __func__); 638 639 return 1; 640 } 641 642 lwsl_user("%s: created query stream %p\n", __func__, hss_avs_sync); 643 644 return 0; 645} 646 647int 648avs_example_start(struct lws_context *context) 649{ 650 lws_ss_info_t ssi; 651 652 if (hss_avs_event) 653 return 0; 654 655 lwsl_info("%s: Starting AVS stream\n", __func__); 656 657 /* AVS wants us to establish the long poll event stream first */ 658 659 memset(&ssi, 0, sizeof(ssi)); 660 ssi.handle_offset = offsetof(ss_avs_event_t, ss); 661 ssi.opaque_user_data_offset = offsetof(ss_avs_event_t, opaque_data); 662 ssi.rx = ss_avs_event_rx; 663 ssi.tx = ss_avs_event_tx; 664 ssi.state = ss_avs_event_state; 665 ssi.user_alloc = sizeof(ss_avs_event_t); 666 ssi.streamtype = "avs_event"; 667 668 if (lws_ss_create(context, 0, &ssi, context, &hss_avs_event, NULL, NULL)) { 669 lwsl_err("%s: failed to create avs event secure stream\n", 670 __func__); 671 return 1; 672 } 673 674 return 0; 675} 676