1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <webrtc/RTPSender.h>
18 
19 #include "Utils.h"
20 
21 #include <webrtc/RTPSocketHandler.h>
22 
23 #include <https/SafeCallbackable.h>
24 #include <https/Support.h>
25 
26 #include <random>
27 #include <unordered_set>
28 
29 #define SIMULATE_PACKET_LOSS    0
30 
RTPSender(std::shared_ptr<RunLoop> runLoop,RTPSocketHandler * parent,std::shared_ptr<Packetizer> videoPacketizer,std::shared_ptr<Packetizer> audioPacketizer)31 RTPSender::RTPSender(
32         std::shared_ptr<RunLoop> runLoop,
33         RTPSocketHandler *parent,
34         std::shared_ptr<Packetizer> videoPacketizer,
35         std::shared_ptr<Packetizer> audioPacketizer)
36     : mRunLoop(runLoop),
37       mParent(parent),
38       mVideoPacketizer(videoPacketizer),
39       mAudioPacketizer(audioPacketizer) {
40 }
41 
addSource(uint32_t ssrc)42 void RTPSender::addSource(uint32_t ssrc) {
43     CHECK(mSources.insert(
44                 std::make_pair(ssrc, SourceInfo())).second);
45 }
46 
addRetransInfo(uint32_t ssrc,uint8_t PT,uint32_t retransSSRC,uint8_t retransPT)47 void RTPSender::addRetransInfo(
48         uint32_t ssrc, uint8_t PT, uint32_t retransSSRC, uint8_t retransPT) {
49     auto it = mSources.find(ssrc);
50     CHECK(it != mSources.end());
51 
52     auto &info = it->second;
53 
54     CHECK(info.mRetrans.insert(
55                 std::make_pair(
56                     PT, std::make_pair(retransSSRC, retransPT))).second);
57 }
58 
injectRTCP(uint8_t * data,size_t size)59 int RTPSender::injectRTCP(uint8_t *data, size_t size) {
60     // LOG(INFO) << "RTPSender::injectRTCP";
61     // hexdump(data, size);
62 
63     while (size > 0) {
64         if (size < 8) {
65             return -EINVAL;
66         }
67 
68         if ((data[0] >> 6) != 2) {
69             // Wrong version.
70             return -EINVAL;
71         }
72 
73         size_t lengthInWords = U16_AT(&data[2]) + 1;
74 
75         bool hasPadding = (data[0] & 0x20);
76 
77         size_t headerSize = 4 * lengthInWords;
78 
79         if (size < headerSize) {
80             return -EINVAL;
81         }
82 
83         if (hasPadding) {
84             if (size != headerSize) {
85                 // Padding should only be added to the last packet in a compound
86                 // packet.
87                 return -EINVAL;
88             }
89 
90             size_t numPadBytes = data[headerSize - 1];
91             if (numPadBytes == 0 || (numPadBytes % 4) != 0) {
92                 return -EINVAL;
93             }
94 
95             headerSize -= numPadBytes;
96         }
97 
98         auto err = processRTCP(data, headerSize);
99 
100         if (err) {
101             return err;
102         }
103 
104         data += 4 * lengthInWords;
105         size -= 4 * lengthInWords;
106     }
107 
108     return 0;
109 }
110 
processRTCP(const uint8_t * data,size_t size)111 int RTPSender::processRTCP(const uint8_t *data, size_t size) {
112     static constexpr uint8_t RR = 201;     // RFC 3550
113     // static constexpr uint8_t SDES = 202;
114     // static constexpr uint8_t BYE = 203;
115     // static constexpr uint8_t APP = 204;
116     static constexpr uint8_t RTPFB = 205;  // RFC 4585
117     static constexpr uint8_t PSFB = 206;
118     static constexpr uint8_t XR = 207;  // RFC 3611
119 
120     unsigned PT = data[1];
121 
122     switch (PT) {
123         case RR:
124         {
125             unsigned RC = data[0] & 0x1f;
126             if (size != 8 + RC * 6 * 4) {
127                 return -EINVAL;
128             }
129 
130             auto senderSSRC = U32_AT(&data[4]);
131 
132             size_t offset = 8;
133             for (unsigned i = 0; i < RC; ++i) {
134                 auto SSRC = U32_AT(&data[offset]);
135                 auto fractionLost = data[offset + 4];
136                 auto cumPacketsLost = U32_AT(&data[offset + 4]) & 0xffffff;
137 
138                 if (fractionLost) {
139                     LOG(INFO)
140                         << "sender SSRC "
141                         << StringPrintf("0x%08x", senderSSRC)
142                         << " reports "
143                         << StringPrintf("%.2f %%", (double)fractionLost * 100.0 / 256.0)
144                         << " lost, cum. total: "
145                         << cumPacketsLost
146                         << " from SSRC "
147                         << StringPrintf("0x%08x", SSRC);
148                 }
149 
150                 offset += 6 * 4;
151             }
152             break;
153         }
154 
155         case RTPFB:
156         {
157             static constexpr uint8_t NACK = 1;
158 
159             if (size < 12) {
160                 return -EINVAL;
161             }
162 
163             unsigned fmt = data[0] & 0x1f;
164 
165             auto senderSSRC = U32_AT(&data[4]);
166             auto SSRC = U32_AT(&data[8]);
167 
168             switch (fmt) {
169                 case NACK:
170                 {
171                     size_t offset = 12;
172                     size_t n = (size - offset) / 4;
173                     for (size_t i = 0; i < n; ++i) {
174                         auto PID = U16_AT(&data[offset]);
175                         auto BLP = U16_AT(&data[offset + 2]);
176 
177                         LOG(INFO)
178                             << "SSRC "
179                             << StringPrintf("0x%08x", senderSSRC)
180                             << " reports NACK w/ PID="
181                             << StringPrintf("0x%04x", PID)
182                             << ", BLP="
183                             << StringPrintf("0x%04x", BLP)
184                             << " from SSRC "
185                             << StringPrintf("0x%08x", SSRC);
186 
187                         offset += 4;
188 
189                         retransmitPackets(SSRC, PID, BLP);
190                     }
191                     break;
192                 }
193 
194                 default:
195                 {
196                     LOG(WARNING) << "RTPSender::processRTCP unhandled RTPFB.";
197                     hexdump(data, size);
198                     break;
199                 }
200             }
201 
202             break;
203         }
204 
205         case PSFB:
206         {
207             static constexpr uint8_t FMT_PLI = 1;
208             static constexpr uint8_t FMT_SLI = 2;
209             static constexpr uint8_t FMT_AFB = 15;
210 
211             if (size < 12) {
212                 return -EINVAL;
213             }
214 
215             unsigned fmt = data[0] & 0x1f;
216 
217             auto SSRC = U32_AT(&data[4]);
218 
219             switch (fmt) {
220                 case FMT_PLI:
221                 {
222                     if (size != 12) {
223                         return -EINVAL;
224                     }
225 
226                     LOG(INFO)
227                         << "Received PLI from SSRC "
228                         << StringPrintf("0x%08x", SSRC);
229 
230                     if (mVideoPacketizer) {
231                         mVideoPacketizer->requestIDRFrame();
232                     }
233                     break;
234                 }
235 
236                 case FMT_SLI:
237                 {
238                     LOG(INFO)
239                         << "Received SLI from SSRC "
240                         << StringPrintf("0x%08x", SSRC);
241 
242                     break;
243                 }
244 
245                 case FMT_AFB:
246                     break;
247 
248                 default:
249                 {
250                     LOG(WARNING) << "RTPSender::processRTCP unhandled PSFB.";
251                     hexdump(data, size);
252                     break;
253                 }
254             }
255             break;
256         }
257 
258         case XR:
259         {
260             static constexpr uint8_t FMT_RRTRB = 4;
261 
262             if (size < 8) {
263                 return -EINVAL;
264             }
265 
266             auto senderSSRC = U32_AT(&data[4]);
267 
268             size_t offset = 8;
269             while (offset + 3 < size) {
270                 auto fmt = data[offset];
271                 auto blockLength = 4 * (1 + U16_AT(&data[offset + 2]));
272 
273                 if (offset + blockLength > size) {
274                     LOG(WARNING) << "Found incomplete XR report block.";
275                     break;
276                 }
277 
278                 switch (fmt) {
279                     case FMT_RRTRB:
280                     {
281                         if (blockLength != 12) {
282                             LOG(WARNING)
283                                 << "Found XR-RRTRB block of invalid length.";
284                             break;
285                         }
286 
287                         auto ntpHi = U32_AT(&data[offset + 4]);
288                         auto ntpLo = U32_AT(&data[offset + 8]);
289 
290                         queueDLRR(
291                                 0xdeadbeef /* localSSRC */,
292                                 senderSSRC,
293                                 ntpHi,
294                                 ntpLo);
295                         break;
296                     }
297 
298                     default:
299                     {
300                         LOG(WARNING)
301                             << "Ignoring unknown XR block type " << fmt;
302 
303                         break;
304                     }
305                 }
306 
307                 offset += blockLength;
308             }
309 
310             if (offset != size) {
311                 LOG(WARNING) << "Found trailing bytes in XR report.";
312             }
313             break;
314         }
315 
316         default:
317         {
318             LOG(WARNING) << "RTPSender::processRTCP unhandled packet type.";
319             hexdump(data, size);
320         }
321     }
322 
323     return 0;
324 }
325 
appendSR(std::vector<uint8_t> * buffer,uint32_t localSSRC)326 void RTPSender::appendSR(std::vector<uint8_t> *buffer, uint32_t localSSRC) {
327     static constexpr uint8_t SR = 200;
328 
329     auto it = mSources.find(localSSRC);
330     CHECK(it != mSources.end());
331 
332     const auto &info = it->second;
333 
334     const size_t kLengthInWords = 7;
335 
336     auto offset = buffer->size();
337     buffer->resize(offset + kLengthInWords * sizeof(uint32_t));
338 
339     uint8_t *data = buffer->data() + offset;
340 
341     data[0] = 0x80;
342     data[1] = SR;
343     SET_U16(&data[2], kLengthInWords - 1);
344     SET_U32(&data[4], localSSRC);
345 
346     auto now = std::chrono::system_clock::now();
347 
348     auto us_since_epoch =
349         std::chrono::duration_cast<std::chrono::microseconds>(
350             now.time_since_epoch()).count();
351 
352     // This assumes that sd::chrono::system_clock's epoch is unix epoch, i.e.
353     // 1/1/1970 midnight UTC.
354     // Microseconds between midnight 1/1/1970 and midnight 1/1/1900.
355     us_since_epoch += 2208988800ULL * 1000ull;
356 
357     uint64_t ntpHi = us_since_epoch / 1000000ll;
358     uint64_t ntpLo = ((1LL << 32) * (us_since_epoch % 1000000LL)) / 1000000LL;
359 
360     uint32_t rtpNow =
361         (localSSRC == 0xdeadbeef || localSSRC == 0xcafeb0b0)
362             ? mVideoPacketizer->rtpNow()
363             : mAudioPacketizer->rtpNow();
364 
365     SET_U32(&data[8], ntpHi);
366     SET_U32(&data[12], ntpLo);
367     SET_U32(&data[16], rtpNow);
368     SET_U32(&data[20], info.mNumPacketsSent);
369     SET_U32(&data[24], info.mNumBytesSent);
370 }
371 
appendSDES(std::vector<uint8_t> * buffer,uint32_t localSSRC)372 void RTPSender::appendSDES(std::vector<uint8_t> *buffer, uint32_t localSSRC) {
373     static constexpr uint8_t SDES = 202;
374 
375     static const char *const kCNAME = "myWebRTP";
376     static const size_t kCNAMELength = strlen(kCNAME);
377 
378     const size_t kLengthInWords = 2 + (2 + kCNAMELength + 1 + 3) / 4;
379 
380     auto offset = buffer->size();
381     buffer->resize(offset + kLengthInWords * sizeof(uint32_t));
382 
383     uint8_t *data = buffer->data() + offset;
384 
385     data[0] = 0x81;
386     data[1] = SDES;
387     SET_U16(&data[2], kLengthInWords - 1);
388     SET_U32(&data[4], localSSRC);
389 
390     data[8] = 1; // CNAME
391     data[9] = kCNAMELength;
392     memcpy(&data[10], kCNAME, kCNAMELength);
393     data[10 + kCNAMELength] = '\0';
394 }
395 
queueDLRR(uint32_t localSSRC,uint32_t remoteSSRC,uint32_t ntpHi,uint32_t ntpLo)396 void RTPSender::queueDLRR(
397         uint32_t localSSRC,
398         uint32_t remoteSSRC,
399         uint32_t ntpHi,
400         uint32_t ntpLo) {
401     std::vector<uint8_t> buffer;
402     appendDLRR(&buffer, localSSRC, remoteSSRC, ntpHi, ntpLo);
403 
404     mParent->queueRTCPDatagram(buffer.data(), buffer.size());
405 }
406 
appendDLRR(std::vector<uint8_t> * buffer,uint32_t localSSRC,uint32_t remoteSSRC,uint32_t ntpHi,uint32_t ntpLo)407 void RTPSender::appendDLRR(
408         std::vector<uint8_t> *buffer,
409         uint32_t localSSRC,
410         uint32_t remoteSSRC,
411         uint32_t ntpHi,
412         uint32_t ntpLo) {
413     static constexpr uint8_t XR = 207;
414 
415     static constexpr uint8_t FMT_DLRRRB = 5;
416 
417     const size_t kLengthInWords = 2 + 4;
418 
419     auto offset = buffer->size();
420     buffer->resize(offset + kLengthInWords * sizeof(uint32_t));
421 
422     uint8_t *data = buffer->data() + offset;
423 
424     data[0] = 0x80;
425     data[1] = XR;
426     SET_U16(&data[2], kLengthInWords - 1);
427     SET_U32(&data[4], localSSRC);
428 
429     data[8] = FMT_DLRRRB;
430     data[9] = 0x00;
431     SET_U16(&data[10], 3 /* block length */);
432     SET_U32(&data[12], remoteSSRC);
433     SET_U32(&data[16], (ntpHi << 16) | (ntpLo >> 16));
434     SET_U32(&data[20], 0 /* delay since last RR */);
435 }
436 
queueSR(uint32_t localSSRC)437 void RTPSender::queueSR(uint32_t localSSRC) {
438     std::vector<uint8_t> buffer;
439     appendSR(&buffer, localSSRC);
440     // appendSDES(&buffer, localSSRC);
441 
442     // LOG(INFO) << "RTPSender::queueSR";
443     // hexdump(buffer.data(), buffer.size());
444 
445     mParent->queueRTCPDatagram(buffer.data(), buffer.size());
446 }
447 
sendSR(uint32_t localSSRC)448 void RTPSender::sendSR(uint32_t localSSRC) {
449     // LOG(INFO) << "sending SR.";
450     queueSR(localSSRC);
451 
452     mRunLoop->postWithDelay(
453             std::chrono::seconds(1),
454             makeSafeCallback(this, &RTPSender::sendSR, localSSRC));
455 }
456 
run()457 void RTPSender::run() {
458     for (const auto &entry : mSources) {
459         sendSR(entry.first);
460     }
461 }
462 
queueRTPDatagram(std::vector<uint8_t> * packet)463 void RTPSender::queueRTPDatagram(std::vector<uint8_t> *packet) {
464     CHECK_GE(packet->size(), 12u);
465 
466     uint32_t SSRC = U32_AT(&packet->data()[8]);
467 
468     auto it = mSources.find(SSRC);
469     CHECK(it != mSources.end());
470 
471     auto &info = it->second;
472 
473     uint16_t seqNum = info.mNumPacketsSent;
474     SET_U16(packet->data() + 2, seqNum);
475 
476 #if SIMULATE_PACKET_LOSS
477     static std::random_device rd;
478     static std::mt19937 gen(rd());
479     static std::uniform_real_distribution<> dist(0.0, 1.0);
480     if (dist(gen) < 0.99) {
481 #endif
482         mParent->queueRTPDatagram(packet->data(), packet->size());
483 #if SIMULATE_PACKET_LOSS
484     } else {
485         LOG(WARNING)
486             << "dropping packet "
487             << StringPrintf("0x%04x", seqNum)
488             << " from SSRC "
489             << StringPrintf("0x%08x", SSRC);
490     }
491 #endif
492 
493     ++info.mNumPacketsSent;
494     info.mNumBytesSent += packet->size() - 12;  // does not include RTP header.
495 
496     if (!info.mRetrans.empty()) {
497         static constexpr size_t kMaxHistory = 512;
498         if (info.mRecentPackets.size() == kMaxHistory) {
499             info.mRecentPackets.pop_front();
500         }
501         // info.mRecentPackets.push_back(std::move(*packet));
502         info.mRecentPackets.push_back(*packet);
503     }
504 }
505 
retransmitPackets(uint32_t localSSRC,uint16_t PID,uint16_t BLP)506 void RTPSender::retransmitPackets(
507         uint32_t localSSRC, uint16_t PID, uint16_t BLP) {
508     auto it = mSources.find(localSSRC);
509     CHECK(it != mSources.end());
510 
511     const auto &info = it->second;
512 
513     if (!info.mRecentPackets.empty()) {
514         LOG(INFO) << "Recent packets cover range ["
515             << StringPrintf(
516                     "0x%04x", U16_AT(info.mRecentPackets.front().data() + 2))
517             << ";"
518             << StringPrintf(
519                     "0x%04x", U16_AT(info.mRecentPackets.back().data() + 2))
520             << "]";
521     } else {
522         LOG(INFO) << "Recent packets are EMPTY!";
523     }
524 
525     bool first = true;
526     while (first || BLP) {
527         if (first) {
528             first = false;
529         } else {
530             ++PID;
531             if (!(BLP & 1)) {
532                 BLP = BLP >> 1;
533                 continue;
534             }
535 
536             BLP = BLP >> 1;
537         }
538 
539         for (auto it = info.mRecentPackets.begin();
540                 it != info.mRecentPackets.end();
541                 ++it) {
542             const auto &origPacket = *it;
543             auto seqNum = U16_AT(origPacket.data() + 2);
544 
545             if (seqNum != PID) {
546                 continue;
547             }
548 
549             LOG(INFO) << "Retransmitting PID " << StringPrintf("0x%04x", PID);
550 
551             auto PT = origPacket[1] & 0x7f;
552             auto it2 = info.mRetrans.find(PT);
553             CHECK(it2 != info.mRetrans.end());
554 
555             auto [rtxSSRC, rtxPT] = it2->second;
556 
557             std::vector<uint8_t> packet(origPacket.size() + 2);
558 
559             // XXX This is very simplified and assumes that the original packet
560             // started with a standard 12-byte header, no extensions and no padding!
561             memcpy(packet.data(), origPacket.data(), 12);
562 
563             packet[1] = (origPacket[1] & 0x80) | (rtxPT & 0x7f);
564             SET_U32(packet.data() + 8, rtxSSRC);
565             SET_U16(packet.data() + 12, seqNum);
566 
567             memcpy(packet.data() + 14,
568                    origPacket.data() + 12,
569                    origPacket.size() - 12);
570 
571             // queueRTPDatagram will fill in the new seqNum.
572             queueRTPDatagram(&packet);
573         }
574     }
575 }
576 
requestIDRFrame()577 void RTPSender::requestIDRFrame() {
578     if (mVideoPacketizer) {
579         mVideoPacketizer->requestIDRFrame();
580     }
581 }
582 
583