1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <webrtc/Packetizer.h>
18 
19 #include <https/SafeCallbackable.h>
20 #include <webrtc/RTPSender.h>
21 
Packetizer(std::shared_ptr<RunLoop> runLoop,std::shared_ptr<StreamingSource> source)22 Packetizer::Packetizer(std::shared_ptr<RunLoop> runLoop,
23                        std::shared_ptr<StreamingSource> source)
24     : mNumSamplesRead(0),
25       mStartTimeMedia(0),
26       mRunLoop(runLoop),
27       mStreamingSource(source) {}
28 
~Packetizer()29 Packetizer::~Packetizer() {
30     if (mStreamingSource) {
31         mStreamingSource->stop();
32     }
33 }
34 
queueRTPDatagram(std::vector<uint8_t> * packet)35 void Packetizer::queueRTPDatagram(std::vector<uint8_t> *packet) {
36     auto it = mSenders.begin();
37     while (it != mSenders.end()) {
38         auto sender = it->lock();
39         if (!sender) {
40             it = mSenders.erase(it);
41             mStreamingSource->notifyStreamConsumerDisconnected();
42             continue;
43         }
44 
45         sender->queueRTPDatagram(packet);
46         ++it;
47     }
48 }
49 
addSender(std::shared_ptr<RTPSender> sender)50 void Packetizer::addSender(std::shared_ptr<RTPSender> sender) {
51     mSenders.push_back(sender);
52     auto weak_source = std::weak_ptr<StreamingSource>(mStreamingSource);
53     mRunLoop->post([weak_source](){
54         auto source = weak_source.lock();
55         if (!source) return;
56         source->notifyNewStreamConsumer();
57     });
58 }
59 
requestIDRFrame()60 int32_t Packetizer::requestIDRFrame() {
61     return mStreamingSource->requestIDRFrame();
62 }
63 
run()64 void Packetizer::run() {
65     auto weak_this = weak_from_this();
66 
67     mStreamingSource->setCallback(
68             [weak_this](const std::shared_ptr<android::SBuffer> &accessUnit) {
69                 auto me = weak_this.lock();
70                 if (me) {
71                     me->mRunLoop->post(
72                             makeSafeCallback(
73                                 me.get(), &Packetizer::onFrame, accessUnit));
74                 }
75             });
76 
77     mStreamingSource->start();
78 }
79 
onFrame(const std::shared_ptr<android::SBuffer> & accessUnit)80 void Packetizer::onFrame(const std::shared_ptr<android::SBuffer>& accessUnit) {
81     if (!accessUnit) {
82         LOG(WARNING) << "Received invalid buffer in " << __FUNCTION__;
83         return;
84     }
85     int64_t timeUs = accessUnit->time_us();
86     CHECK(timeUs);
87 
88     auto now = std::chrono::steady_clock::now();
89 
90     if (mNumSamplesRead == 0) {
91         mStartTimeMedia = timeUs;
92         mStartTimeReal = now;
93     }
94 
95     ++mNumSamplesRead;
96 
97     LOG(VERBOSE)
98         << "got accessUnit of size "
99         << accessUnit->size()
100         << " at time "
101         << timeUs;
102 
103     packetize(accessUnit, timeUs);
104 }
105 
timeSinceStart() const106 uint32_t Packetizer::timeSinceStart() const {
107     if (mNumSamplesRead) return 0;
108 
109     auto now = std::chrono::steady_clock::now();
110     return std::chrono::duration_cast<std::chrono::microseconds>(now -
111                                                                  mStartTimeReal)
112         .count();
113 }
114