1cabdff1aSopenharmony_ci/* 2cabdff1aSopenharmony_ci * Advanced Message Queuing Protocol (AMQP) 0-9-1 3cabdff1aSopenharmony_ci * Copyright (c) 2020 Andriy Gelman 4cabdff1aSopenharmony_ci * 5cabdff1aSopenharmony_ci * This file is part of FFmpeg. 6cabdff1aSopenharmony_ci * 7cabdff1aSopenharmony_ci * FFmpeg is free software; you can redistribute it and/or 8cabdff1aSopenharmony_ci * modify it under the terms of the GNU Lesser General Public 9cabdff1aSopenharmony_ci * License as published by the Free Software Foundation; either 10cabdff1aSopenharmony_ci * version 2.1 of the License, or (at your option) any later version. 11cabdff1aSopenharmony_ci * 12cabdff1aSopenharmony_ci * FFmpeg is distributed in the hope that it will be useful, 13cabdff1aSopenharmony_ci * but WITHOUT ANY WARRANTY; without even the implied warranty of 14cabdff1aSopenharmony_ci * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15cabdff1aSopenharmony_ci * Lesser General Public License for more details. 16cabdff1aSopenharmony_ci * 17cabdff1aSopenharmony_ci * You should have received a copy of the GNU Lesser General Public 18cabdff1aSopenharmony_ci * License along with FFmpeg; if not, write to the Free Software 19cabdff1aSopenharmony_ci * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 20cabdff1aSopenharmony_ci */ 21cabdff1aSopenharmony_ci 22cabdff1aSopenharmony_ci#include <amqp.h> 23cabdff1aSopenharmony_ci#include <amqp_tcp_socket.h> 24cabdff1aSopenharmony_ci#include <sys/time.h> 25cabdff1aSopenharmony_ci#include "avformat.h" 26cabdff1aSopenharmony_ci#include "libavutil/avstring.h" 27cabdff1aSopenharmony_ci#include "libavutil/opt.h" 28cabdff1aSopenharmony_ci#include "libavutil/time.h" 29cabdff1aSopenharmony_ci#include "network.h" 30cabdff1aSopenharmony_ci#include "url.h" 31cabdff1aSopenharmony_ci#include "urldecode.h" 32cabdff1aSopenharmony_ci 33cabdff1aSopenharmony_citypedef struct AMQPContext { 34cabdff1aSopenharmony_ci const AVClass *class; 35cabdff1aSopenharmony_ci amqp_connection_state_t conn; 36cabdff1aSopenharmony_ci amqp_socket_t *socket; 37cabdff1aSopenharmony_ci const char *exchange; 38cabdff1aSopenharmony_ci const char *routing_key; 39cabdff1aSopenharmony_ci int pkt_size; 40cabdff1aSopenharmony_ci int64_t connection_timeout; 41cabdff1aSopenharmony_ci int pkt_size_overflow; 42cabdff1aSopenharmony_ci int delivery_mode; 43cabdff1aSopenharmony_ci} AMQPContext; 44cabdff1aSopenharmony_ci 45cabdff1aSopenharmony_ci#define STR_LEN 1024 46cabdff1aSopenharmony_ci#define DEFAULT_CHANNEL 1 47cabdff1aSopenharmony_ci 48cabdff1aSopenharmony_ci#define OFFSET(x) offsetof(AMQPContext, x) 49cabdff1aSopenharmony_ci#define D AV_OPT_FLAG_DECODING_PARAM 50cabdff1aSopenharmony_ci#define E AV_OPT_FLAG_ENCODING_PARAM 51cabdff1aSopenharmony_cistatic const AVOption options[] = { 52cabdff1aSopenharmony_ci { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E }, 53cabdff1aSopenharmony_ci { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E }, 54cabdff1aSopenharmony_ci { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E }, 55cabdff1aSopenharmony_ci { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E}, 56cabdff1aSopenharmony_ci { "delivery_mode", "Delivery mode", OFFSET(delivery_mode), AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags = E, "delivery_mode"}, 57cabdff1aSopenharmony_ci { "persistent", "Persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0, E, "delivery_mode" }, 58cabdff1aSopenharmony_ci { "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0, E, "delivery_mode" }, 59cabdff1aSopenharmony_ci { NULL } 60cabdff1aSopenharmony_ci}; 61cabdff1aSopenharmony_ci 62cabdff1aSopenharmony_cistatic int amqp_proto_open(URLContext *h, const char *uri, int flags) 63cabdff1aSopenharmony_ci{ 64cabdff1aSopenharmony_ci int ret, server_msg; 65cabdff1aSopenharmony_ci char hostname[STR_LEN], credentials[STR_LEN], path[STR_LEN]; 66cabdff1aSopenharmony_ci int port; 67cabdff1aSopenharmony_ci const char *user, *password = NULL, *vhost; 68cabdff1aSopenharmony_ci const char *user_decoded, *password_decoded, *vhost_decoded; 69cabdff1aSopenharmony_ci char *p; 70cabdff1aSopenharmony_ci amqp_rpc_reply_t broker_reply; 71cabdff1aSopenharmony_ci struct timeval tval = { 0 }; 72cabdff1aSopenharmony_ci 73cabdff1aSopenharmony_ci AMQPContext *s = h->priv_data; 74cabdff1aSopenharmony_ci 75cabdff1aSopenharmony_ci h->is_streamed = 1; 76cabdff1aSopenharmony_ci h->max_packet_size = s->pkt_size; 77cabdff1aSopenharmony_ci 78cabdff1aSopenharmony_ci av_url_split(NULL, 0, credentials, sizeof(credentials), 79cabdff1aSopenharmony_ci hostname, sizeof(hostname), &port, path, sizeof(path), uri); 80cabdff1aSopenharmony_ci 81cabdff1aSopenharmony_ci if (port < 0) 82cabdff1aSopenharmony_ci port = 5672; 83cabdff1aSopenharmony_ci 84cabdff1aSopenharmony_ci if (hostname[0] == '\0' || port <= 0 || port > 65535 ) { 85cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n"); 86cabdff1aSopenharmony_ci return AVERROR(EINVAL); 87cabdff1aSopenharmony_ci } 88cabdff1aSopenharmony_ci 89cabdff1aSopenharmony_ci p = strchr(credentials, ':'); 90cabdff1aSopenharmony_ci if (p) { 91cabdff1aSopenharmony_ci *p = '\0'; 92cabdff1aSopenharmony_ci password = p + 1; 93cabdff1aSopenharmony_ci } 94cabdff1aSopenharmony_ci 95cabdff1aSopenharmony_ci if (!password || *password == '\0') 96cabdff1aSopenharmony_ci password = "guest"; 97cabdff1aSopenharmony_ci 98cabdff1aSopenharmony_ci password_decoded = ff_urldecode(password, 0); 99cabdff1aSopenharmony_ci if (!password_decoded) 100cabdff1aSopenharmony_ci return AVERROR(ENOMEM); 101cabdff1aSopenharmony_ci 102cabdff1aSopenharmony_ci user = credentials; 103cabdff1aSopenharmony_ci if (*user == '\0') 104cabdff1aSopenharmony_ci user = "guest"; 105cabdff1aSopenharmony_ci 106cabdff1aSopenharmony_ci user_decoded = ff_urldecode(user, 0); 107cabdff1aSopenharmony_ci if (!user_decoded) { 108cabdff1aSopenharmony_ci av_freep(&password_decoded); 109cabdff1aSopenharmony_ci return AVERROR(ENOMEM); 110cabdff1aSopenharmony_ci } 111cabdff1aSopenharmony_ci 112cabdff1aSopenharmony_ci /* skip query for now */ 113cabdff1aSopenharmony_ci p = strchr(path, '?'); 114cabdff1aSopenharmony_ci if (p) 115cabdff1aSopenharmony_ci *p = '\0'; 116cabdff1aSopenharmony_ci 117cabdff1aSopenharmony_ci vhost = path; 118cabdff1aSopenharmony_ci if (*vhost == '\0') 119cabdff1aSopenharmony_ci vhost = "/"; 120cabdff1aSopenharmony_ci else 121cabdff1aSopenharmony_ci vhost++; /* skip leading '/' */ 122cabdff1aSopenharmony_ci 123cabdff1aSopenharmony_ci vhost_decoded = ff_urldecode(vhost, 0); 124cabdff1aSopenharmony_ci if (!vhost_decoded) { 125cabdff1aSopenharmony_ci av_freep(&user_decoded); 126cabdff1aSopenharmony_ci av_freep(&password_decoded); 127cabdff1aSopenharmony_ci return AVERROR(ENOMEM); 128cabdff1aSopenharmony_ci } 129cabdff1aSopenharmony_ci 130cabdff1aSopenharmony_ci s->conn = amqp_new_connection(); 131cabdff1aSopenharmony_ci if (!s->conn) { 132cabdff1aSopenharmony_ci av_freep(&vhost_decoded); 133cabdff1aSopenharmony_ci av_freep(&user_decoded); 134cabdff1aSopenharmony_ci av_freep(&password_decoded); 135cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Error creating connection\n"); 136cabdff1aSopenharmony_ci return AVERROR_EXTERNAL; 137cabdff1aSopenharmony_ci } 138cabdff1aSopenharmony_ci 139cabdff1aSopenharmony_ci s->socket = amqp_tcp_socket_new(s->conn); 140cabdff1aSopenharmony_ci if (!s->socket) { 141cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Error creating socket\n"); 142cabdff1aSopenharmony_ci goto destroy_connection; 143cabdff1aSopenharmony_ci } 144cabdff1aSopenharmony_ci 145cabdff1aSopenharmony_ci if (s->connection_timeout < 0) 146cabdff1aSopenharmony_ci s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000); 147cabdff1aSopenharmony_ci 148cabdff1aSopenharmony_ci tval.tv_sec = s->connection_timeout / 1000000; 149cabdff1aSopenharmony_ci tval.tv_usec = s->connection_timeout % 1000000; 150cabdff1aSopenharmony_ci ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval); 151cabdff1aSopenharmony_ci 152cabdff1aSopenharmony_ci if (ret) { 153cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n", 154cabdff1aSopenharmony_ci amqp_error_string2(ret)); 155cabdff1aSopenharmony_ci goto destroy_connection; 156cabdff1aSopenharmony_ci } 157cabdff1aSopenharmony_ci 158cabdff1aSopenharmony_ci broker_reply = amqp_login(s->conn, vhost_decoded, 0, s->pkt_size, 0, 159cabdff1aSopenharmony_ci AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded); 160cabdff1aSopenharmony_ci 161cabdff1aSopenharmony_ci if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { 162cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Error login\n"); 163cabdff1aSopenharmony_ci server_msg = AMQP_ACCESS_REFUSED; 164cabdff1aSopenharmony_ci goto close_connection; 165cabdff1aSopenharmony_ci } 166cabdff1aSopenharmony_ci 167cabdff1aSopenharmony_ci amqp_channel_open(s->conn, DEFAULT_CHANNEL); 168cabdff1aSopenharmony_ci broker_reply = amqp_get_rpc_reply(s->conn); 169cabdff1aSopenharmony_ci 170cabdff1aSopenharmony_ci if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { 171cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Error set channel\n"); 172cabdff1aSopenharmony_ci server_msg = AMQP_CHANNEL_ERROR; 173cabdff1aSopenharmony_ci goto close_connection; 174cabdff1aSopenharmony_ci } 175cabdff1aSopenharmony_ci 176cabdff1aSopenharmony_ci if (h->flags & AVIO_FLAG_READ) { 177cabdff1aSopenharmony_ci amqp_bytes_t queuename; 178cabdff1aSopenharmony_ci char queuename_buff[STR_LEN]; 179cabdff1aSopenharmony_ci amqp_queue_declare_ok_t *r; 180cabdff1aSopenharmony_ci 181cabdff1aSopenharmony_ci r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes, 182cabdff1aSopenharmony_ci 0, 0, 0, 1, amqp_empty_table); 183cabdff1aSopenharmony_ci broker_reply = amqp_get_rpc_reply(s->conn); 184cabdff1aSopenharmony_ci if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { 185cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Error declare queue\n"); 186cabdff1aSopenharmony_ci server_msg = AMQP_RESOURCE_ERROR; 187cabdff1aSopenharmony_ci goto close_channel; 188cabdff1aSopenharmony_ci } 189cabdff1aSopenharmony_ci 190cabdff1aSopenharmony_ci /* store queuename */ 191cabdff1aSopenharmony_ci queuename.bytes = queuename_buff; 192cabdff1aSopenharmony_ci queuename.len = FFMIN(r->queue.len, STR_LEN); 193cabdff1aSopenharmony_ci memcpy(queuename.bytes, r->queue.bytes, queuename.len); 194cabdff1aSopenharmony_ci 195cabdff1aSopenharmony_ci amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename, 196cabdff1aSopenharmony_ci amqp_cstring_bytes(s->exchange), 197cabdff1aSopenharmony_ci amqp_cstring_bytes(s->routing_key), amqp_empty_table); 198cabdff1aSopenharmony_ci 199cabdff1aSopenharmony_ci broker_reply = amqp_get_rpc_reply(s->conn); 200cabdff1aSopenharmony_ci if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { 201cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Queue bind error\n"); 202cabdff1aSopenharmony_ci server_msg = AMQP_INTERNAL_ERROR; 203cabdff1aSopenharmony_ci goto close_channel; 204cabdff1aSopenharmony_ci } 205cabdff1aSopenharmony_ci 206cabdff1aSopenharmony_ci amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes, 207cabdff1aSopenharmony_ci 0, 1, 0, amqp_empty_table); 208cabdff1aSopenharmony_ci 209cabdff1aSopenharmony_ci broker_reply = amqp_get_rpc_reply(s->conn); 210cabdff1aSopenharmony_ci if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) { 211cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Set consume error\n"); 212cabdff1aSopenharmony_ci server_msg = AMQP_INTERNAL_ERROR; 213cabdff1aSopenharmony_ci goto close_channel; 214cabdff1aSopenharmony_ci } 215cabdff1aSopenharmony_ci } 216cabdff1aSopenharmony_ci 217cabdff1aSopenharmony_ci av_freep(&vhost_decoded); 218cabdff1aSopenharmony_ci av_freep(&user_decoded); 219cabdff1aSopenharmony_ci av_freep(&password_decoded); 220cabdff1aSopenharmony_ci return 0; 221cabdff1aSopenharmony_ci 222cabdff1aSopenharmony_ciclose_channel: 223cabdff1aSopenharmony_ci amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg); 224cabdff1aSopenharmony_ciclose_connection: 225cabdff1aSopenharmony_ci amqp_connection_close(s->conn, server_msg); 226cabdff1aSopenharmony_cidestroy_connection: 227cabdff1aSopenharmony_ci amqp_destroy_connection(s->conn); 228cabdff1aSopenharmony_ci 229cabdff1aSopenharmony_ci av_freep(&vhost_decoded); 230cabdff1aSopenharmony_ci av_freep(&user_decoded); 231cabdff1aSopenharmony_ci av_freep(&password_decoded); 232cabdff1aSopenharmony_ci return AVERROR_EXTERNAL; 233cabdff1aSopenharmony_ci} 234cabdff1aSopenharmony_ci 235cabdff1aSopenharmony_cistatic int amqp_proto_write(URLContext *h, const unsigned char *buf, int size) 236cabdff1aSopenharmony_ci{ 237cabdff1aSopenharmony_ci int ret; 238cabdff1aSopenharmony_ci AMQPContext *s = h->priv_data; 239cabdff1aSopenharmony_ci int fd = amqp_socket_get_sockfd(s->socket); 240cabdff1aSopenharmony_ci 241cabdff1aSopenharmony_ci amqp_bytes_t message = { size, (void *)buf }; 242cabdff1aSopenharmony_ci amqp_basic_properties_t props; 243cabdff1aSopenharmony_ci 244cabdff1aSopenharmony_ci ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback); 245cabdff1aSopenharmony_ci if (ret) 246cabdff1aSopenharmony_ci return ret; 247cabdff1aSopenharmony_ci 248cabdff1aSopenharmony_ci props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; 249cabdff1aSopenharmony_ci props.content_type = amqp_cstring_bytes("octet/stream"); 250cabdff1aSopenharmony_ci props.delivery_mode = s->delivery_mode; 251cabdff1aSopenharmony_ci 252cabdff1aSopenharmony_ci ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange), 253cabdff1aSopenharmony_ci amqp_cstring_bytes(s->routing_key), 0, 0, 254cabdff1aSopenharmony_ci &props, message); 255cabdff1aSopenharmony_ci 256cabdff1aSopenharmony_ci if (ret) { 257cabdff1aSopenharmony_ci av_log(h, AV_LOG_ERROR, "Error publish: %s\n", amqp_error_string2(ret)); 258cabdff1aSopenharmony_ci return AVERROR_EXTERNAL; 259cabdff1aSopenharmony_ci } 260cabdff1aSopenharmony_ci 261cabdff1aSopenharmony_ci return size; 262cabdff1aSopenharmony_ci} 263cabdff1aSopenharmony_ci 264cabdff1aSopenharmony_cistatic int amqp_proto_read(URLContext *h, unsigned char *buf, int size) 265cabdff1aSopenharmony_ci{ 266cabdff1aSopenharmony_ci AMQPContext *s = h->priv_data; 267cabdff1aSopenharmony_ci int fd = amqp_socket_get_sockfd(s->socket); 268cabdff1aSopenharmony_ci int ret; 269cabdff1aSopenharmony_ci 270cabdff1aSopenharmony_ci amqp_rpc_reply_t broker_reply; 271cabdff1aSopenharmony_ci amqp_envelope_t envelope; 272cabdff1aSopenharmony_ci 273cabdff1aSopenharmony_ci ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback); 274cabdff1aSopenharmony_ci if (ret) 275cabdff1aSopenharmony_ci return ret; 276cabdff1aSopenharmony_ci 277cabdff1aSopenharmony_ci amqp_maybe_release_buffers(s->conn); 278cabdff1aSopenharmony_ci broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0); 279cabdff1aSopenharmony_ci 280cabdff1aSopenharmony_ci if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) 281cabdff1aSopenharmony_ci return AVERROR_EXTERNAL; 282cabdff1aSopenharmony_ci 283cabdff1aSopenharmony_ci if (envelope.message.body.len > size) { 284cabdff1aSopenharmony_ci s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len); 285cabdff1aSopenharmony_ci av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. " 286cabdff1aSopenharmony_ci "Message will be truncated. Setting -pkt_size %d " 287cabdff1aSopenharmony_ci "may resolve this issue.\n", s->pkt_size_overflow); 288cabdff1aSopenharmony_ci } 289cabdff1aSopenharmony_ci size = FFMIN(size, envelope.message.body.len); 290cabdff1aSopenharmony_ci 291cabdff1aSopenharmony_ci memcpy(buf, envelope.message.body.bytes, size); 292cabdff1aSopenharmony_ci amqp_destroy_envelope(&envelope); 293cabdff1aSopenharmony_ci 294cabdff1aSopenharmony_ci return size; 295cabdff1aSopenharmony_ci} 296cabdff1aSopenharmony_ci 297cabdff1aSopenharmony_cistatic int amqp_proto_close(URLContext *h) 298cabdff1aSopenharmony_ci{ 299cabdff1aSopenharmony_ci AMQPContext *s = h->priv_data; 300cabdff1aSopenharmony_ci amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS); 301cabdff1aSopenharmony_ci amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS); 302cabdff1aSopenharmony_ci amqp_destroy_connection(s->conn); 303cabdff1aSopenharmony_ci 304cabdff1aSopenharmony_ci return 0; 305cabdff1aSopenharmony_ci} 306cabdff1aSopenharmony_ci 307cabdff1aSopenharmony_cistatic const AVClass amqp_context_class = { 308cabdff1aSopenharmony_ci .class_name = "amqp", 309cabdff1aSopenharmony_ci .item_name = av_default_item_name, 310cabdff1aSopenharmony_ci .option = options, 311cabdff1aSopenharmony_ci .version = LIBAVUTIL_VERSION_INT, 312cabdff1aSopenharmony_ci}; 313cabdff1aSopenharmony_ci 314cabdff1aSopenharmony_ciconst URLProtocol ff_libamqp_protocol = { 315cabdff1aSopenharmony_ci .name = "amqp", 316cabdff1aSopenharmony_ci .url_close = amqp_proto_close, 317cabdff1aSopenharmony_ci .url_open = amqp_proto_open, 318cabdff1aSopenharmony_ci .url_read = amqp_proto_read, 319cabdff1aSopenharmony_ci .url_write = amqp_proto_write, 320cabdff1aSopenharmony_ci .priv_data_size = sizeof(AMQPContext), 321cabdff1aSopenharmony_ci .priv_data_class = &amqp_context_class, 322cabdff1aSopenharmony_ci .flags = URL_PROTOCOL_FLAG_NETWORK, 323cabdff1aSopenharmony_ci}; 324