1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <webrtc/RTPSocketHandler.h>
18 
19 #include <webrtc/MyWebSocketHandler.h>
20 #include <webrtc/STUNMessage.h>
21 #include <Utils.h>
22 
23 #include <https/PlainSocket.h>
24 #include <https/SafeCallbackable.h>
25 #include <https/Support.h>
26 #include <android-base/logging.h>
27 
28 #include <netdb.h>
29 #include <netinet/in.h>
30 
31 #include <cstring>
32 #include <iostream>
33 #include <set>
34 
35 #include <gflags/gflags.h>
36 
37 DECLARE_string(public_ip);
38 
39 // These are the ports we currently open in the firewall (15550..15557)
40 static constexpr int kPortRangeBegin = 15550;
41 static constexpr int kPortRangeEnd = 15558;
42 static constexpr int kPortRangeEndTcp = 15551;
43 
getSockAddrLen(const sockaddr_storage & addr)44 static socklen_t getSockAddrLen(const sockaddr_storage &addr) {
45     switch (addr.ss_family) {
46         case AF_INET:
47             return sizeof(sockaddr_in);
48         case AF_INET6:
49             return sizeof(sockaddr_in6);
50         default:
51             CHECK(!"Should not be here.");
52             return 0;
53     }
54 }
55 
acquirePort(int sockfd,int domain,bool tcp)56 static int acquirePort(int sockfd, int domain, bool tcp) {
57     sockaddr_storage addr;
58     uint16_t* port_ptr;
59 
60     if (domain == PF_INET) {
61         sockaddr_in addrV4;
62         memset(addrV4.sin_zero, 0, sizeof(addrV4.sin_zero));
63         addrV4.sin_family = AF_INET;
64         addrV4.sin_addr.s_addr = INADDR_ANY;
65         memcpy(&addr, &addrV4, sizeof(addrV4));
66         port_ptr = &(reinterpret_cast<sockaddr_in*>(&addr)->sin_port);
67     } else {
68         CHECK_EQ(domain, PF_INET6);
69         sockaddr_in6 addrV6;
70         addrV6.sin6_family = AF_INET6;
71         addrV6.sin6_addr = in6addr_any;
72         addrV6.sin6_scope_id = 0;
73         memcpy(&addr, &addrV6, sizeof(addrV6));
74         port_ptr = &(reinterpret_cast<sockaddr_in6*>(&addr)->sin6_port);
75     }
76 
77     int port = kPortRangeBegin;
78     for (;port < kPortRangeEnd; ++port) {
79         *port_ptr = htons(port);
80         errno = 0;
81         int res = bind(sockfd, reinterpret_cast<const sockaddr *>(&addr),
82                        getSockAddrLen(addr));
83         if (res == 0) {
84             return port;
85         }
86         if (errno != EADDRINUSE) {
87             return -1;
88         }
89         // for now, limit to one client / one tcp port to minimize
90         // complexity for using WebRTC over TCP over ssh tunnels
91         if (tcp && port == kPortRangeEndTcp)
92             break;
93         // else try the next port
94     }
95 
96     return -1;
97 }
98 
RTPSocketHandler(std::shared_ptr<RunLoop> runLoop,std::shared_ptr<ServerState> serverState,TransportType transportType,int domain,uint32_t trackMask,std::shared_ptr<RTPSession> session)99 RTPSocketHandler::RTPSocketHandler(
100         std::shared_ptr<RunLoop> runLoop,
101         std::shared_ptr<ServerState> serverState,
102         TransportType transportType,
103         int domain,
104         uint32_t trackMask,
105         std::shared_ptr<RTPSession> session)
106     : mRunLoop(runLoop),
107       mServerState(serverState),
108       mTransportType(transportType),
109       mTrackMask(trackMask),
110       mSession(session),
111       mSendPending(false),
112       mDTLSConnected(false),
113       mInBufferLength(0) {
114     bool tcp = mTransportType == TransportType::TCP;
115 
116     int sock = socket(domain, tcp ? SOCK_STREAM : SOCK_DGRAM, 0);
117 
118     if (tcp) {
119         static constexpr int yes = 1;
120         auto res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
121         CHECK(!res);
122     }
123 
124     makeFdNonblocking(sock);
125 
126     mLocalPort = acquirePort(sock, domain, tcp);
127 
128     CHECK(mLocalPort > 0);
129 
130     if (tcp) {
131         auto res = listen(sock, 4);
132         CHECK(!res);
133     }
134 
135     auto tmp = std::make_shared<PlainSocket>(mRunLoop, sock);
136     if (tcp) {
137         mServerSocket = tmp;
138     } else {
139         mSocket = tmp;
140     }
141 
142     auto videoPacketizer =
143         (trackMask & TRACK_VIDEO)
144             ? mServerState->getVideoPacketizer() : nullptr;
145 
146     auto audioPacketizer =
147         (trackMask & TRACK_AUDIO)
148             ? mServerState->getAudioPacketizer() : nullptr;
149 
150     mRTPSender = std::make_shared<RTPSender>(
151             mRunLoop,
152             this,
153             videoPacketizer,
154             audioPacketizer);
155 
156     if (trackMask & TRACK_VIDEO) {
157         mRTPSender->addSource(0xdeadbeef);
158         mRTPSender->addSource(0xcafeb0b0);
159 
160         mRTPSender->addRetransInfo(0xdeadbeef, 96, 0xcafeb0b0, 97);
161     }
162 
163     if (trackMask & TRACK_AUDIO) {
164         mRTPSender->addSource(0x8badf00d);
165     }
166 }
167 
getLocalPort() const168 uint16_t RTPSocketHandler::getLocalPort() const {
169     return mLocalPort;
170 }
171 
getLocalUFrag() const172 std::string RTPSocketHandler::getLocalUFrag() const {
173     return mSession->localUFrag();
174 }
175 
getLocalIPString() const176 std::string RTPSocketHandler::getLocalIPString() const {
177     return FLAGS_public_ip;
178 }
179 
run()180 void RTPSocketHandler::run() {
181     if (mTransportType == TransportType::TCP) {
182         mServerSocket->postRecv(
183                 makeSafeCallback(this, &RTPSocketHandler::onTCPConnect));
184     } else {
185         mSocket->postRecv(makeSafeCallback(this, &RTPSocketHandler::onReceive));
186     }
187 }
188 
onTCPConnect()189 void RTPSocketHandler::onTCPConnect() {
190     int sock = accept(mServerSocket->fd(), nullptr, 0);
191 
192     if (sock < 0) {
193         LOG(ERROR) << "RTPSocketHandler: Failed to accept client";
194         mSocket->postRecv(makeSafeCallback(this, &RTPSocketHandler::onTCPConnect));
195         return;
196     }
197 
198     LOG(INFO) << "RTPSocketHandler: Accepted client";
199 
200     makeFdNonblocking(sock);
201 
202     mClientAddrLen = sizeof(mClientAddr);
203 
204     int res = getpeername(
205             sock, reinterpret_cast<sockaddr *>(&mClientAddr), &mClientAddrLen);
206 
207     CHECK(!res);
208 
209     mSocket = std::make_shared<PlainSocket>(mRunLoop, sock);
210 
211     mSocket->postRecv(makeSafeCallback(this, &RTPSocketHandler::onTCPReceive));
212 }
213 
onTCPReceive()214 void RTPSocketHandler::onTCPReceive() {
215     mInBuffer.resize(mInBuffer.size() + 8192);
216 
217     auto n = mSocket->recv(
218             mInBuffer.data() + mInBufferLength, mInBuffer.size() - mInBufferLength);
219 
220     if (n == 0) {
221         LOG(INFO) << "Client disconnected.";
222         return;
223     }
224 
225     mInBufferLength += n;
226 
227     size_t offset = 0;
228     while (offset + 1 < mInBufferLength) {
229         auto packetLength = U16_AT(mInBuffer.data() + offset);
230         offset += 2;
231 
232         if (offset + packetLength > mInBufferLength) {
233             break;
234         }
235 
236         onPacketReceived(
237                 mClientAddr,
238                 mClientAddrLen,
239                 mInBuffer.data() + offset,
240                 packetLength);
241 
242         offset += packetLength;
243     }
244 
245     if (offset > 0) {
246         mInBuffer.erase(mInBuffer.begin(), mInBuffer.begin() + offset);
247         mInBufferLength -= offset;
248     }
249 
250     mSocket->postRecv(makeSafeCallback(this, &RTPSocketHandler::onTCPReceive));
251 }
252 
onReceive()253 void RTPSocketHandler::onReceive() {
254     std::vector<uint8_t> buffer(kMaxUDPPayloadSize);
255 
256     uint8_t *data = buffer.data();
257 
258     sockaddr_storage addr;
259     socklen_t addrLen = sizeof(addr);
260 
261     auto n = mSocket->recvfrom(
262             data, buffer.size(), reinterpret_cast<sockaddr *>(&addr), &addrLen);
263 
264     onPacketReceived(addr, addrLen, data, n);
265 
266     mSocket->postRecv(makeSafeCallback(this, &RTPSocketHandler::onReceive));
267 }
268 
onPacketReceived(const sockaddr_storage & addr,socklen_t addrLen,uint8_t * data,size_t n)269 void RTPSocketHandler::onPacketReceived(
270         const sockaddr_storage &addr,
271         socklen_t addrLen,
272         uint8_t *data,
273         size_t n) {
274 #if 0
275     std::cout << "========================================" << std::endl;
276 
277     hexdump(data, n);
278 #endif
279 
280     STUNMessage msg(data, n);
281     if (!msg.isValid()) {
282         if (mDTLSConnected) {
283             int err = -EINVAL;
284             if (mRTPSender) {
285                 err = onSRTPReceive(data, static_cast<size_t>(n));
286             }
287 
288             if (err == -EINVAL) {
289                 LOG(VERBOSE) << "Sending to DTLS instead:";
290                 // hexdump(data, n);
291 
292                 onDTLSReceive(data, static_cast<size_t>(n));
293 
294                 if (mTrackMask & TRACK_DATA) {
295                     ssize_t n;
296 
297                     do {
298                         uint8_t buf[kMaxUDPPayloadSize];
299                         n = mDTLS->readApplicationData(buf, sizeof(buf));
300 
301                         if (n > 0) {
302                             auto err = mSCTPHandler->inject(
303                                     buf, static_cast<size_t>(n));
304 
305                             if (err) {
306                                 LOG(WARNING)
307                                     << "SCTPHandler::inject returned error "
308                                     << err;
309                             }
310                         }
311                     } while (n > 0);
312                 }
313             }
314         } else {
315             onDTLSReceive(data, static_cast<size_t>(n));
316         }
317 
318         return;
319     }
320 
321     if (msg.type() == 0x0001 /* Binding Request */) {
322         STUNMessage response(0x0101 /* Binding Response */, msg.data() + 8);
323 
324         if (!matchesSession(msg)) {
325             LOG(WARNING) << "Unknown session or no USERNAME.";
326             return;
327         }
328 
329         const auto &answerPassword = mSession->localPassword();
330 
331         // msg.dump(answerPassword);
332 
333         if (addr.ss_family == AF_INET) {
334             uint8_t attr[8];
335             attr[0] = 0x00;
336 
337             sockaddr_in addrV4;
338             CHECK_EQ(addrLen, sizeof(addrV4));
339 
340             memcpy(&addrV4, &addr, addrLen);
341 
342             attr[1] = 0x01;  // IPv4
343 
344             static constexpr uint32_t kMagicCookie = 0x2112a442;
345 
346             uint16_t portHost = ntohs(addrV4.sin_port);
347             portHost ^= (kMagicCookie >> 16);
348 
349             uint32_t ipHost = ntohl(addrV4.sin_addr.s_addr);
350             ipHost ^= kMagicCookie;
351 
352             attr[2] = portHost >> 8;
353             attr[3] = portHost & 0xff;
354             attr[4] = ipHost >> 24;
355             attr[5] = (ipHost >> 16) & 0xff;
356             attr[6] = (ipHost >> 8) & 0xff;
357             attr[7] = ipHost & 0xff;
358 
359             response.addAttribute(
360                     0x0020 /* XOR-MAPPED-ADDRESS */, attr, sizeof(attr));
361         } else {
362             uint8_t attr[20];
363             attr[0] = 0x00;
364 
365             CHECK_EQ(addr.ss_family, AF_INET6);
366 
367             sockaddr_in6 addrV6;
368             CHECK_EQ(addrLen, sizeof(addrV6));
369 
370             memcpy(&addrV6, &addr, addrLen);
371 
372             attr[1] = 0x02;  // IPv6
373 
374             static constexpr uint32_t kMagicCookie = 0x2112a442;
375 
376             uint16_t portHost = ntohs(addrV6.sin6_port);
377             portHost ^= (kMagicCookie >> 16);
378 
379             attr[2] = portHost >> 8;
380             attr[3] = portHost & 0xff;
381 
382             uint8_t ipHost[16];
383 
384             std::string out;
385 
386             for (size_t i = 0; i < 16; ++i) {
387                 ipHost[i] = addrV6.sin6_addr.s6_addr[15 - i];
388 
389                 if (!out.empty()) {
390                     out += ":";
391                 }
392                 out += StringPrintf("%02x", ipHost[i]);
393 
394                 ipHost[i] ^= response.data()[4 + i];
395             }
396 
397             // LOG(INFO) << "IP6 = " << out;
398 
399             for (size_t i = 0; i < 16; ++i) {
400                 attr[4 + i] = ipHost[15 - i];
401             }
402 
403             response.addAttribute(
404                     0x0020 /* XOR-MAPPED-ADDRESS */, attr, sizeof(attr));
405         }
406 
407         response.addMessageIntegrityAttribute(answerPassword);
408         response.addFingerprint();
409 
410         // response.dump(answerPassword);
411 
412         queueDatagram(addr, response.data(), response.size());
413 
414         if (!mSession->isActive()) {
415             mSession->setRemoteAddress(addr);
416 
417             mSession->setIsActive();
418 
419             mSession->schedulePing(
420                     mRunLoop,
421                     makeSafeCallback(
422                         this, &RTPSocketHandler::pingRemote, mSession),
423                     std::chrono::seconds(0));
424         }
425 
426     } else {
427         // msg.dump();
428 
429         if (msg.type() == 0x0101 && !mDTLS) {
430             mDTLS = std::make_shared<DTLS>(
431                     shared_from_this(),
432                     DTLS::Mode::ACCEPT,
433                     mSession->localCertificate(),
434                     mSession->localKey(),
435                     mSession->remoteFingerprint(),
436                     (mTrackMask != TRACK_DATA) /* useSRTP */);
437 
438             mDTLS->connect(mSession->remoteAddress());
439         }
440     }
441 }
442 
matchesSession(const STUNMessage & msg) const443 bool RTPSocketHandler::matchesSession(const STUNMessage &msg) const {
444     const void *attrData;
445     size_t attrSize;
446     if (!msg.findAttribute(0x0006 /* USERNAME */, &attrData, &attrSize)) {
447         return false;
448     }
449 
450     std::string uFragPair(static_cast<const char *>(attrData), attrSize);
451     auto colonPos = uFragPair.find(':');
452 
453     if (colonPos == std::string::npos) {
454         return false;
455     }
456 
457     std::string localUFrag(uFragPair, 0, colonPos);
458     std::string remoteUFrag(uFragPair, colonPos + 1);
459 
460     if (mSession->localUFrag() != localUFrag
461             || mSession->remoteUFrag() != remoteUFrag) {
462 
463         LOG(WARNING)
464             << "Unable to find session localUFrag='"
465             << localUFrag
466             << "', remoteUFrag='"
467             << remoteUFrag
468             << "'";
469 
470         return false;
471     }
472 
473     return true;
474 }
475 
pingRemote(std::shared_ptr<RTPSession> session)476 void RTPSocketHandler::pingRemote(std::shared_ptr<RTPSession> session) {
477     std::vector<uint8_t> transactionID { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
478 
479     STUNMessage msg(
480             0x0001 /* Binding Request */,
481             transactionID.data());
482 
483     std::string uFragPair =
484             session->remoteUFrag() + ":" + session->localUFrag();
485 
486     msg.addAttribute(
487             0x0006 /* USERNAME */,
488             uFragPair.c_str(),
489             uFragPair.size());
490 
491     uint64_t tieBreaker = 0xdeadbeefcafeb0b0;  // XXX
492     msg.addAttribute(
493             0x802a /* ICE-CONTROLLING */,
494             &tieBreaker,
495             sizeof(tieBreaker));
496 
497     uint32_t priority = 0xdeadbeef;
498     msg.addAttribute(
499             0x0024 /* PRIORITY */, &priority, sizeof(priority));
500 
501     // We're the controlling agent and including the "USE-CANDIDATE" attribute
502     // below nominates this candidate.
503     msg.addAttribute(0x0025 /* USE_CANDIDATE */);
504 
505     msg.addMessageIntegrityAttribute(session->remotePassword());
506     msg.addFingerprint();
507 
508     queueDatagram(session->remoteAddress(), msg.data(), msg.size());
509 
510     session->schedulePing(
511             mRunLoop,
512             makeSafeCallback(this, &RTPSocketHandler::pingRemote, session),
513             std::chrono::seconds(1));
514 }
515 
Datagram(const sockaddr_storage & addr,const void * data,size_t size)516 RTPSocketHandler::Datagram::Datagram(
517         const sockaddr_storage &addr, const void *data, size_t size)
518     : mData(size),
519       mAddr(addr) {
520     memcpy(mData.data(), data, size);
521 }
522 
data() const523 const void *RTPSocketHandler::Datagram::data() const {
524     return mData.data();
525 }
526 
size() const527 size_t RTPSocketHandler::Datagram::size() const {
528     return mData.size();
529 }
530 
remoteAddress() const531 const sockaddr_storage &RTPSocketHandler::Datagram::remoteAddress() const {
532     return mAddr;
533 }
534 
queueDatagram(const sockaddr_storage & addr,const void * data,size_t size)535 void RTPSocketHandler::queueDatagram(
536         const sockaddr_storage &addr, const void *data, size_t size) {
537     if (mTransportType == TransportType::TCP) {
538         std::vector copy(
539                 static_cast<const uint8_t *>(data),
540                 static_cast<const uint8_t *>(data) + size);
541 
542         mRunLoop->post(
543                 makeSafeCallback<RTPSocketHandler>(
544                     this,
545                     [copy](RTPSocketHandler *me) {
546                         // addr is ignored and assumed to be the connected endpoint's.
547                         me->queueTCPOutputPacket(copy.data(), copy.size());
548                     }));
549 
550         return;
551     }
552 
553     auto datagram = std::make_shared<Datagram>(addr, data, size);
554 
555     CHECK_LE(size, RTPSocketHandler::kMaxUDPPayloadSize);
556 
557     mRunLoop->post(
558             makeSafeCallback<RTPSocketHandler>(
559                 this,
560                 [datagram](RTPSocketHandler *me) {
561                     me->mOutQueue.push_back(datagram);
562 
563                     if (!me->mSendPending) {
564                         me->scheduleDrainOutQueue();
565                     }
566                 }));
567 }
568 
queueTCPOutputPacket(const uint8_t * data,size_t size)569 void RTPSocketHandler::queueTCPOutputPacket(const uint8_t *data, size_t size) {
570     uint8_t framing[2];
571     framing[0] = size >> 8;
572     framing[1] = size & 0xff;
573 
574     std::copy(framing, framing + sizeof(framing), std::back_inserter(mOutBuffer));
575     std::copy(data, data + size, std::back_inserter(mOutBuffer));
576 
577     if (!mSendPending) {
578         mSendPending = true;
579 
580         mSocket->postSend(
581                 makeSafeCallback(this, &RTPSocketHandler::sendTCPOutputData));
582     }
583 }
584 
sendTCPOutputData()585 void RTPSocketHandler::sendTCPOutputData() {
586     mSendPending = false;
587 
588     const size_t size = mOutBuffer.size();
589     size_t offset = 0;
590 
591     bool disconnected = false;
592 
593     while (offset < size) {
594         auto n = mSocket->send(mOutBuffer.data() + offset, size - offset);
595 
596         if (n < 0) {
597             if (errno == EINTR) {
598                 continue;
599             }
600 
601             LOG(FATAL) << "Should not be here.";
602         } else if (n == 0) {
603             offset = size;
604             disconnected = true;
605             break;
606         }
607 
608         offset += static_cast<size_t>(n);
609     }
610 
611     mOutBuffer.erase(mOutBuffer.begin(), mOutBuffer.begin() + offset);
612 
613     if (!mOutBuffer.empty() && !disconnected) {
614         mSendPending = true;
615 
616         mSocket->postSend(
617                 makeSafeCallback(this, &RTPSocketHandler::sendTCPOutputData));
618     }
619 }
620 
scheduleDrainOutQueue()621 void RTPSocketHandler::scheduleDrainOutQueue() {
622     CHECK(!mSendPending);
623 
624     mSendPending = true;
625     mSocket->postSend(
626             makeSafeCallback(
627                 this, &RTPSocketHandler::drainOutQueue));
628 }
629 
drainOutQueue()630 void RTPSocketHandler::drainOutQueue() {
631     mSendPending = false;
632 
633     CHECK(!mOutQueue.empty());
634 
635     do {
636         auto datagram = mOutQueue.front();
637 
638         ssize_t n;
639         do {
640             const sockaddr_storage &remoteAddr = datagram->remoteAddress();
641 
642             n = mSocket->sendto(
643                     datagram->data(),
644                     datagram->size(),
645                     reinterpret_cast<const sockaddr *>(&remoteAddr),
646                     getSockAddrLen(remoteAddr));
647         } while (n < 0 && errno == EINTR);
648 
649         if (n < 0) {
650             if (errno == EAGAIN || errno == EWOULDBLOCK) {
651                 break;
652             }
653 
654             CHECK(!"Should not be here");
655         }
656 
657         mOutQueue.pop_front();
658 
659     } while (!mOutQueue.empty());
660 
661     if (!mOutQueue.empty()) {
662         scheduleDrainOutQueue();
663     }
664 }
665 
onDTLSReceive(const uint8_t * data,size_t size)666 void RTPSocketHandler::onDTLSReceive(const uint8_t *data, size_t size) {
667     if (mDTLS) {
668         mDTLS->inject(data, size);
669     }
670 }
671 
notifyDTLSConnected()672 void RTPSocketHandler::notifyDTLSConnected() {
673     LOG(INFO) << "TDLS says that it's now connected.";
674 
675     mDTLSConnected = true;
676 
677     if (mTrackMask & TRACK_VIDEO) {
678         mServerState->getVideoPacketizer()->addSender(mRTPSender);
679     }
680 
681     if (mTrackMask & TRACK_AUDIO) {
682         mServerState->getAudioPacketizer()->addSender(mRTPSender);
683     }
684 
685     if (mTrackMask & TRACK_DATA) {
686         mSCTPHandler = std::make_shared<SCTPHandler>(mRunLoop, mDTLS);
687         mSCTPHandler->run();
688     }
689 
690     mRTPSender->run();
691 }
692 
onSRTPReceive(uint8_t * data,size_t size)693 int RTPSocketHandler::onSRTPReceive(uint8_t *data, size_t size) {
694     if (size < 2) {
695         return -EINVAL;
696     }
697 
698     auto version = data[0] >> 6;
699     if (version != 2) {
700         return -EINVAL;
701     }
702 
703     auto outSize = mDTLS->unprotect(data, size, false /* isRTP */);
704 
705     auto err = mRTPSender->injectRTCP(data, outSize);
706     if (err) {
707         LOG(WARNING) << "RTPSender::injectRTCP returned " << err;
708     }
709 
710     return err;
711 }
712 
queueRTCPDatagram(const void * data,size_t size)713 void RTPSocketHandler::queueRTCPDatagram(const void *data, size_t size) {
714     if (!mDTLSConnected) {
715         return;
716     }
717 
718     std::vector<uint8_t> copy(size + SRTP_MAX_TRAILER_LEN);
719     memcpy(copy.data(), data, size);
720 
721     auto outSize = mDTLS->protect(copy.data(), size, false /* isRTP */);
722     CHECK_LE(outSize, copy.size());
723 
724     queueDatagram(mSession->remoteAddress(), copy.data(), outSize);
725 }
726 
queueRTPDatagram(const void * data,size_t size)727 void RTPSocketHandler::queueRTPDatagram(const void *data, size_t size) {
728     if (!mDTLSConnected) {
729         return;
730     }
731 
732     std::vector<uint8_t> copy(size + SRTP_MAX_TRAILER_LEN);
733     memcpy(copy.data(), data, size);
734 
735     auto outSize = mDTLS->protect(copy.data(), size, true /* isRTP */);
736     CHECK_LE(outSize, copy.size());
737 
738     queueDatagram(mSession->remoteAddress(), copy.data(), outSize);
739 }
740