1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <webrtc/Packetizer.h>
18
19 #include <https/SafeCallbackable.h>
20 #include <webrtc/RTPSender.h>
21
Packetizer(std::shared_ptr<RunLoop> runLoop,std::shared_ptr<StreamingSource> source)22 Packetizer::Packetizer(std::shared_ptr<RunLoop> runLoop,
23 std::shared_ptr<StreamingSource> source)
24 : mNumSamplesRead(0),
25 mStartTimeMedia(0),
26 mRunLoop(runLoop),
27 mStreamingSource(source) {}
28
~Packetizer()29 Packetizer::~Packetizer() {
30 if (mStreamingSource) {
31 mStreamingSource->stop();
32 }
33 }
34
queueRTPDatagram(std::vector<uint8_t> * packet)35 void Packetizer::queueRTPDatagram(std::vector<uint8_t> *packet) {
36 auto it = mSenders.begin();
37 while (it != mSenders.end()) {
38 auto sender = it->lock();
39 if (!sender) {
40 it = mSenders.erase(it);
41 mStreamingSource->notifyStreamConsumerDisconnected();
42 continue;
43 }
44
45 sender->queueRTPDatagram(packet);
46 ++it;
47 }
48 }
49
addSender(std::shared_ptr<RTPSender> sender)50 void Packetizer::addSender(std::shared_ptr<RTPSender> sender) {
51 mSenders.push_back(sender);
52 auto weak_source = std::weak_ptr<StreamingSource>(mStreamingSource);
53 mRunLoop->post([weak_source](){
54 auto source = weak_source.lock();
55 if (!source) return;
56 source->notifyNewStreamConsumer();
57 });
58 }
59
requestIDRFrame()60 int32_t Packetizer::requestIDRFrame() {
61 return mStreamingSource->requestIDRFrame();
62 }
63
run()64 void Packetizer::run() {
65 auto weak_this = weak_from_this();
66
67 mStreamingSource->setCallback(
68 [weak_this](const std::shared_ptr<android::SBuffer> &accessUnit) {
69 auto me = weak_this.lock();
70 if (me) {
71 me->mRunLoop->post(
72 makeSafeCallback(
73 me.get(), &Packetizer::onFrame, accessUnit));
74 }
75 });
76
77 mStreamingSource->start();
78 }
79
onFrame(const std::shared_ptr<android::SBuffer> & accessUnit)80 void Packetizer::onFrame(const std::shared_ptr<android::SBuffer>& accessUnit) {
81 if (!accessUnit) {
82 LOG(WARNING) << "Received invalid buffer in " << __FUNCTION__;
83 return;
84 }
85 int64_t timeUs = accessUnit->time_us();
86 CHECK(timeUs);
87
88 auto now = std::chrono::steady_clock::now();
89
90 if (mNumSamplesRead == 0) {
91 mStartTimeMedia = timeUs;
92 mStartTimeReal = now;
93 }
94
95 ++mNumSamplesRead;
96
97 LOG(VERBOSE)
98 << "got accessUnit of size "
99 << accessUnit->size()
100 << " at time "
101 << timeUs;
102
103 packetize(accessUnit, timeUs);
104 }
105
timeSinceStart() const106 uint32_t Packetizer::timeSinceStart() const {
107 if (mNumSamplesRead) return 0;
108
109 auto now = std::chrono::steady_clock::now();
110 return std::chrono::duration_cast<std::chrono::microseconds>(now -
111 mStartTimeReal)
112 .count();
113 }
114