1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "[email protected]"
18
19 #include "Filter.h"
20 #include <utils/Log.h>
21
22 namespace android {
23 namespace hardware {
24 namespace tv {
25 namespace tuner {
26 namespace V1_0 {
27 namespace implementation {
28
29 #define WAIT_TIMEOUT 3000000000
30
Filter()31 Filter::Filter() {}
32
Filter(DemuxFilterType type,uint32_t filterId,uint32_t bufferSize,const sp<IFilterCallback> & cb,sp<Demux> demux)33 Filter::Filter(DemuxFilterType type, uint32_t filterId, uint32_t bufferSize,
34 const sp<IFilterCallback>& cb, sp<Demux> demux) {
35 mType = type;
36 mFilterId = filterId;
37 mBufferSize = bufferSize;
38 mCallback = cb;
39 mDemux = demux;
40
41 switch (mType.mainType) {
42 case DemuxFilterMainType::TS:
43 if (mType.subType.tsFilterType() == DemuxTsFilterType::AUDIO ||
44 mType.subType.tsFilterType() == DemuxTsFilterType::VIDEO) {
45 mIsMediaFilter = true;
46 }
47 if (mType.subType.tsFilterType() == DemuxTsFilterType::PCR) {
48 mIsPcrFilter = true;
49 }
50 if (mType.subType.tsFilterType() == DemuxTsFilterType::RECORD) {
51 mIsRecordFilter = true;
52 }
53 break;
54 case DemuxFilterMainType::MMTP:
55 if (mType.subType.mmtpFilterType() == DemuxMmtpFilterType::AUDIO ||
56 mType.subType.mmtpFilterType() == DemuxMmtpFilterType::VIDEO) {
57 mIsMediaFilter = true;
58 }
59 if (mType.subType.mmtpFilterType() == DemuxMmtpFilterType::RECORD) {
60 mIsRecordFilter = true;
61 }
62 break;
63 case DemuxFilterMainType::IP:
64 break;
65 case DemuxFilterMainType::TLV:
66 break;
67 case DemuxFilterMainType::ALP:
68 break;
69 default:
70 break;
71 }
72 }
73
~Filter()74 Filter::~Filter() {}
75
getId(getId_cb _hidl_cb)76 Return<void> Filter::getId(getId_cb _hidl_cb) {
77 ALOGV("%s", __FUNCTION__);
78
79 _hidl_cb(Result::SUCCESS, mFilterId);
80 return Void();
81 }
82
setDataSource(const sp<IFilter> & filter)83 Return<Result> Filter::setDataSource(const sp<IFilter>& filter) {
84 ALOGV("%s", __FUNCTION__);
85
86 mDataSource = filter;
87 mIsDataSourceDemux = false;
88
89 return Result::SUCCESS;
90 }
91
getQueueDesc(getQueueDesc_cb _hidl_cb)92 Return<void> Filter::getQueueDesc(getQueueDesc_cb _hidl_cb) {
93 ALOGV("%s", __FUNCTION__);
94
95 mIsUsingFMQ = true;
96
97 _hidl_cb(Result::SUCCESS, *mFilterMQ->getDesc());
98 return Void();
99 }
100
configure(const DemuxFilterSettings & settings)101 Return<Result> Filter::configure(const DemuxFilterSettings& settings) {
102 ALOGV("%s", __FUNCTION__);
103
104 mFilterSettings = settings;
105 switch (mType.mainType) {
106 case DemuxFilterMainType::TS:
107 mTpid = settings.ts().tpid;
108 break;
109 case DemuxFilterMainType::MMTP:
110 break;
111 case DemuxFilterMainType::IP:
112 break;
113 case DemuxFilterMainType::TLV:
114 break;
115 case DemuxFilterMainType::ALP:
116 break;
117 default:
118 break;
119 }
120
121 return Result::SUCCESS;
122 }
123
start()124 Return<Result> Filter::start() {
125 ALOGV("%s", __FUNCTION__);
126
127 return startFilterLoop();
128 }
129
stop()130 Return<Result> Filter::stop() {
131 ALOGV("%s", __FUNCTION__);
132
133 mFilterThreadRunning = false;
134
135 std::lock_guard<std::mutex> lock(mFilterThreadLock);
136
137 return Result::SUCCESS;
138 }
139
flush()140 Return<Result> Filter::flush() {
141 ALOGV("%s", __FUNCTION__);
142
143 // temp implementation to flush the FMQ
144 int size = mFilterMQ->availableToRead();
145 char* buffer = new char[size];
146 mFilterMQ->read((unsigned char*)&buffer[0], size);
147 delete[] buffer;
148 mFilterStatus = DemuxFilterStatus::DATA_READY;
149
150 return Result::SUCCESS;
151 }
152
releaseAvHandle(const hidl_handle &,uint64_t avDataId)153 Return<Result> Filter::releaseAvHandle(const hidl_handle& /*avMemory*/, uint64_t avDataId) {
154 ALOGV("%s", __FUNCTION__);
155 if (mDataId2Avfd.find(avDataId) == mDataId2Avfd.end()) {
156 return Result::INVALID_ARGUMENT;
157 }
158
159 ::close(mDataId2Avfd[avDataId]);
160 return Result::SUCCESS;
161 }
162
close()163 Return<Result> Filter::close() {
164 ALOGV("%s", __FUNCTION__);
165
166 return mDemux->removeFilter(mFilterId);
167 }
168
createFilterMQ()169 bool Filter::createFilterMQ() {
170 ALOGV("%s", __FUNCTION__);
171
172 // Create a synchronized FMQ that supports blocking read/write
173 std::unique_ptr<FilterMQ> tmpFilterMQ =
174 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(mBufferSize, true));
175 if (!tmpFilterMQ->isValid()) {
176 ALOGW("[Filter] Failed to create FMQ of filter with id: %d", mFilterId);
177 return false;
178 }
179
180 mFilterMQ = std::move(tmpFilterMQ);
181
182 if (EventFlag::createEventFlag(mFilterMQ->getEventFlagWord(), &mFilterEventFlag) != OK) {
183 return false;
184 }
185
186 return true;
187 }
188
startFilterLoop()189 Result Filter::startFilterLoop() {
190 pthread_create(&mFilterThread, NULL, __threadLoopFilter, this);
191 pthread_setname_np(mFilterThread, "filter_waiting_loop");
192
193 return Result::SUCCESS;
194 }
195
__threadLoopFilter(void * user)196 void* Filter::__threadLoopFilter(void* user) {
197 Filter* const self = static_cast<Filter*>(user);
198 self->filterThreadLoop();
199 return 0;
200 }
201
filterThreadLoop()202 void Filter::filterThreadLoop() {
203 ALOGD("[Filter] filter %d threadLoop start.", mFilterId);
204 std::lock_guard<std::mutex> lock(mFilterThreadLock);
205 mFilterThreadRunning = true;
206
207 // For the first time of filter output, implementation needs to send the filter
208 // Event Callback without waiting for the DATA_CONSUMED to init the process.
209 while (mFilterThreadRunning) {
210 if (mFilterEvent.events.size() == 0) {
211 if (DEBUG_FILTER) {
212 ALOGD("[Filter] wait for filter data output.");
213 }
214 usleep(1000 * 1000);
215 continue;
216 }
217 // After successfully write, send a callback and wait for the read to be done
218 mCallback->onFilterEvent(mFilterEvent);
219 freeAvHandle();
220 mFilterEvent.events.resize(0);
221 mFilterStatus = DemuxFilterStatus::DATA_READY;
222 if (mCallback == nullptr) {
223 ALOGD("[Filter] filter %d does not hava callback. Ending thread", mFilterId);
224 break;
225 }
226 mCallback->onFilterStatus(mFilterStatus);
227 break;
228 }
229
230 while (mFilterThreadRunning) {
231 uint32_t efState = 0;
232 // We do not wait for the last round of written data to be read to finish the thread
233 // because the VTS can verify the reading itself.
234 for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
235 while (mFilterThreadRunning && mIsUsingFMQ) {
236 status_t status = mFilterEventFlag->wait(
237 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
238 WAIT_TIMEOUT, true /* retry on spurious wake */);
239 if (status != OK) {
240 ALOGD("[Filter] wait for data consumed");
241 continue;
242 }
243 break;
244 }
245
246 maySendFilterStatusCallback();
247
248 while (mFilterThreadRunning) {
249 std::lock_guard<std::mutex> lock(mFilterEventLock);
250 if (mFilterEvent.events.size() == 0) {
251 continue;
252 }
253 // After successfully write, send a callback and wait for the read to be done
254 mCallback->onFilterEvent(mFilterEvent);
255 mFilterEvent.events.resize(0);
256 break;
257 }
258 // We do not wait for the last read to be done
259 // VTS can verify the read result itself.
260 if (i == SECTION_WRITE_COUNT - 1) {
261 ALOGD("[Filter] filter %d writing done. Ending thread", mFilterId);
262 break;
263 }
264 }
265 mFilterThreadRunning = false;
266 }
267
268 ALOGD("[Filter] filter thread ended.");
269 }
270
freeAvHandle()271 void Filter::freeAvHandle() {
272 if (!mIsMediaFilter) {
273 return;
274 }
275 for (int i = 0; i < mFilterEvent.events.size(); i++) {
276 ::close(mFilterEvent.events[i].media().avMemory.getNativeHandle()->data[0]);
277 native_handle_close(mFilterEvent.events[i].media().avMemory.getNativeHandle());
278 }
279 }
280
maySendFilterStatusCallback()281 void Filter::maySendFilterStatusCallback() {
282 if (!mIsUsingFMQ) {
283 return;
284 }
285 std::lock_guard<std::mutex> lock(mFilterStatusLock);
286 int availableToRead = mFilterMQ->availableToRead();
287 int availableToWrite = mFilterMQ->availableToWrite();
288 int fmqSize = mFilterMQ->getQuantumCount();
289
290 DemuxFilterStatus newStatus = checkFilterStatusChange(
291 availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
292 if (mFilterStatus != newStatus) {
293 mCallback->onFilterStatus(newStatus);
294 mFilterStatus = newStatus;
295 }
296 }
297
checkFilterStatusChange(uint32_t availableToWrite,uint32_t availableToRead,uint32_t highThreshold,uint32_t lowThreshold)298 DemuxFilterStatus Filter::checkFilterStatusChange(uint32_t availableToWrite,
299 uint32_t availableToRead, uint32_t highThreshold,
300 uint32_t lowThreshold) {
301 if (availableToWrite == 0) {
302 return DemuxFilterStatus::OVERFLOW;
303 } else if (availableToRead > highThreshold) {
304 return DemuxFilterStatus::HIGH_WATER;
305 } else if (availableToRead < lowThreshold) {
306 return DemuxFilterStatus::LOW_WATER;
307 }
308 return mFilterStatus;
309 }
310
getTpid()311 uint16_t Filter::getTpid() {
312 return mTpid;
313 }
314
updateFilterOutput(vector<uint8_t> data)315 void Filter::updateFilterOutput(vector<uint8_t> data) {
316 std::lock_guard<std::mutex> lock(mFilterOutputLock);
317 mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
318 }
319
updateRecordOutput(vector<uint8_t> data)320 void Filter::updateRecordOutput(vector<uint8_t> data) {
321 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
322 mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
323 }
324
startFilterHandler()325 Result Filter::startFilterHandler() {
326 std::lock_guard<std::mutex> lock(mFilterOutputLock);
327 switch (mType.mainType) {
328 case DemuxFilterMainType::TS:
329 switch (mType.subType.tsFilterType()) {
330 case DemuxTsFilterType::UNDEFINED:
331 break;
332 case DemuxTsFilterType::SECTION:
333 startSectionFilterHandler();
334 break;
335 case DemuxTsFilterType::PES:
336 startPesFilterHandler();
337 break;
338 case DemuxTsFilterType::TS:
339 startTsFilterHandler();
340 break;
341 case DemuxTsFilterType::AUDIO:
342 case DemuxTsFilterType::VIDEO:
343 startMediaFilterHandler();
344 break;
345 case DemuxTsFilterType::PCR:
346 startPcrFilterHandler();
347 break;
348 case DemuxTsFilterType::TEMI:
349 startTemiFilterHandler();
350 break;
351 default:
352 break;
353 }
354 break;
355 case DemuxFilterMainType::MMTP:
356 /*mmtpSettings*/
357 break;
358 case DemuxFilterMainType::IP:
359 /*ipSettings*/
360 break;
361 case DemuxFilterMainType::TLV:
362 /*tlvSettings*/
363 break;
364 case DemuxFilterMainType::ALP:
365 /*alpSettings*/
366 break;
367 default:
368 break;
369 }
370 return Result::SUCCESS;
371 }
372
startSectionFilterHandler()373 Result Filter::startSectionFilterHandler() {
374 if (mFilterOutput.empty()) {
375 return Result::SUCCESS;
376 }
377 if (!writeSectionsAndCreateEvent(mFilterOutput)) {
378 ALOGD("[Filter] filter %d fails to write into FMQ. Ending thread", mFilterId);
379 return Result::UNKNOWN_ERROR;
380 }
381
382 mFilterOutput.clear();
383
384 return Result::SUCCESS;
385 }
386
startPesFilterHandler()387 Result Filter::startPesFilterHandler() {
388 std::lock_guard<std::mutex> lock(mFilterEventLock);
389 if (mFilterOutput.empty()) {
390 return Result::SUCCESS;
391 }
392
393 for (int i = 0; i < mFilterOutput.size(); i += 188) {
394 if (mPesSizeLeft == 0) {
395 uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
396 mFilterOutput[i + 6];
397 if (DEBUG_FILTER) {
398 ALOGD("[Filter] prefix %d", prefix);
399 }
400 if (prefix == 0x000001) {
401 // TODO handle mulptiple Pes filters
402 mPesSizeLeft = (mFilterOutput[i + 8] << 8) | mFilterOutput[i + 9];
403 mPesSizeLeft += 6;
404 if (DEBUG_FILTER) {
405 ALOGD("[Filter] pes data length %d", mPesSizeLeft);
406 }
407 } else {
408 continue;
409 }
410 }
411
412 int endPoint = min(184, mPesSizeLeft);
413 // append data and check size
414 vector<uint8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
415 vector<uint8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
416 mPesOutput.insert(mPesOutput.end(), first, last);
417 // size does not match then continue
418 mPesSizeLeft -= endPoint;
419 if (DEBUG_FILTER) {
420 ALOGD("[Filter] pes data left %d", mPesSizeLeft);
421 }
422 if (mPesSizeLeft > 0) {
423 continue;
424 }
425 // size match then create event
426 if (!writeDataToFilterMQ(mPesOutput)) {
427 ALOGD("[Filter] pes data write failed");
428 mFilterOutput.clear();
429 return Result::INVALID_STATE;
430 }
431 maySendFilterStatusCallback();
432 DemuxFilterPesEvent pesEvent;
433 pesEvent = {
434 // temp dump meta data
435 .streamId = mPesOutput[3],
436 .dataLength = static_cast<uint16_t>(mPesOutput.size()),
437 };
438 if (DEBUG_FILTER) {
439 ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
440 }
441
442 int size = mFilterEvent.events.size();
443 mFilterEvent.events.resize(size + 1);
444 mFilterEvent.events[size].pes(pesEvent);
445 mPesOutput.clear();
446 }
447
448 mFilterOutput.clear();
449
450 return Result::SUCCESS;
451 }
452
startTsFilterHandler()453 Result Filter::startTsFilterHandler() {
454 // TODO handle starting TS filter
455 return Result::SUCCESS;
456 }
457
startMediaFilterHandler()458 Result Filter::startMediaFilterHandler() {
459 std::lock_guard<std::mutex> lock(mFilterEventLock);
460 if (mFilterOutput.empty()) {
461 return Result::SUCCESS;
462 }
463 for (int i = 0; i < mFilterOutput.size(); i += 188) {
464 if (mPesSizeLeft == 0) {
465 uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
466 mFilterOutput[i + 6];
467 if (DEBUG_FILTER) {
468 ALOGD("[Filter] prefix %d", prefix);
469 }
470 if (prefix == 0x000001) {
471 // TODO handle mulptiple Pes filters
472 mPesSizeLeft = (mFilterOutput[i + 8] << 8) | mFilterOutput[i + 9];
473 mPesSizeLeft += 6;
474 if (DEBUG_FILTER) {
475 ALOGD("[Filter] pes data length %d", mPesSizeLeft);
476 }
477 } else {
478 continue;
479 }
480 }
481
482 int endPoint = min(184, mPesSizeLeft);
483 // append data and check size
484 vector<uint8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
485 vector<uint8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
486 mPesOutput.insert(mPesOutput.end(), first, last);
487 // size does not match then continue
488 mPesSizeLeft -= endPoint;
489 if (DEBUG_FILTER) {
490 ALOGD("[Filter] pes data left %d", mPesSizeLeft);
491 }
492 if (mPesSizeLeft > 0 || mAvBufferCopyCount++ < 10) {
493 continue;
494 }
495
496 int av_fd = createAvIonFd(mPesOutput.size());
497 if (av_fd == -1) {
498 return Result::UNKNOWN_ERROR;
499 }
500 // copy the filtered data to the buffer
501 uint8_t* avBuffer = getIonBuffer(av_fd, mPesOutput.size());
502 if (avBuffer == NULL) {
503 return Result::UNKNOWN_ERROR;
504 }
505 memcpy(avBuffer, mPesOutput.data(), mPesOutput.size() * sizeof(uint8_t));
506
507 native_handle_t* nativeHandle = createNativeHandle(av_fd);
508 if (nativeHandle == NULL) {
509 return Result::UNKNOWN_ERROR;
510 }
511 hidl_handle handle;
512 handle.setTo(nativeHandle, /*shouldOwn=*/true);
513
514 // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
515 uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
516 mDataId2Avfd[dataId] = dup(av_fd);
517
518 // Create mediaEvent and send callback
519 DemuxFilterMediaEvent mediaEvent;
520 mediaEvent = {
521 .avMemory = std::move(handle),
522 .dataLength = static_cast<uint32_t>(mPesOutput.size()),
523 .avDataId = dataId,
524 };
525 int size = mFilterEvent.events.size();
526 mFilterEvent.events.resize(size + 1);
527 mFilterEvent.events[size].media(mediaEvent);
528
529 // Clear and log
530 mPesOutput.clear();
531 mAvBufferCopyCount = 0;
532 ::close(av_fd);
533 if (DEBUG_FILTER) {
534 ALOGD("[Filter] assembled av data length %d", mediaEvent.dataLength);
535 }
536 }
537
538 mFilterOutput.clear();
539
540 return Result::SUCCESS;
541 }
542
startRecordFilterHandler()543 Result Filter::startRecordFilterHandler() {
544 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
545 if (mRecordFilterOutput.empty()) {
546 return Result::SUCCESS;
547 }
548
549 if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
550 ALOGD("[Filter] dvr fails to write into record FMQ.");
551 return Result::UNKNOWN_ERROR;
552 }
553
554 mRecordFilterOutput.clear();
555 return Result::SUCCESS;
556 }
557
startPcrFilterHandler()558 Result Filter::startPcrFilterHandler() {
559 // TODO handle starting PCR filter
560 return Result::SUCCESS;
561 }
562
startTemiFilterHandler()563 Result Filter::startTemiFilterHandler() {
564 // TODO handle starting TEMI filter
565 return Result::SUCCESS;
566 }
567
writeSectionsAndCreateEvent(vector<uint8_t> data)568 bool Filter::writeSectionsAndCreateEvent(vector<uint8_t> data) {
569 // TODO check how many sections has been read
570 ALOGD("[Filter] section handler");
571 std::lock_guard<std::mutex> lock(mFilterEventLock);
572 if (!writeDataToFilterMQ(data)) {
573 return false;
574 }
575 int size = mFilterEvent.events.size();
576 mFilterEvent.events.resize(size + 1);
577 DemuxFilterSectionEvent secEvent;
578 secEvent = {
579 // temp dump meta data
580 .tableId = 0,
581 .version = 1,
582 .sectionNum = 1,
583 .dataLength = static_cast<uint16_t>(data.size()),
584 };
585 mFilterEvent.events[size].section(secEvent);
586 return true;
587 }
588
writeDataToFilterMQ(const std::vector<uint8_t> & data)589 bool Filter::writeDataToFilterMQ(const std::vector<uint8_t>& data) {
590 std::lock_guard<std::mutex> lock(mWriteLock);
591 if (mFilterMQ->write(data.data(), data.size())) {
592 return true;
593 }
594 return false;
595 }
596
attachFilterToRecord(const sp<Dvr> dvr)597 void Filter::attachFilterToRecord(const sp<Dvr> dvr) {
598 mDvr = dvr;
599 }
600
detachFilterFromRecord()601 void Filter::detachFilterFromRecord() {
602 mDvr = nullptr;
603 }
604
createAvIonFd(int size)605 int Filter::createAvIonFd(int size) {
606 // Create an ion fd and allocate an av fd mapped to a buffer to it.
607 int ion_fd = ion_open();
608 if (ion_fd == -1) {
609 ALOGE("[Filter] Failed to open ion fd %d", errno);
610 return -1;
611 }
612 int av_fd = -1;
613 ion_alloc_fd(dup(ion_fd), size, 0 /*align*/, ION_HEAP_SYSTEM_MASK, 0 /*flags*/, &av_fd);
614 if (av_fd == -1) {
615 ALOGE("[Filter] Failed to create av fd %d", errno);
616 return -1;
617 }
618 return av_fd;
619 }
620
getIonBuffer(int fd,int size)621 uint8_t* Filter::getIonBuffer(int fd, int size) {
622 uint8_t* avBuf = static_cast<uint8_t*>(
623 mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 /*offset*/));
624 if (avBuf == MAP_FAILED) {
625 ALOGE("[Filter] fail to allocate buffer %d", errno);
626 return NULL;
627 }
628 return avBuf;
629 }
630
createNativeHandle(int fd)631 native_handle_t* Filter::createNativeHandle(int fd) {
632 // Create a native handle to pass the av fd via the callback event.
633 native_handle_t* nativeHandle = native_handle_create(/*numFd*/ 1, 0);
634 if (nativeHandle == NULL) {
635 ALOGE("[Filter] Failed to create native_handle %d", errno);
636 return NULL;
637 }
638 nativeHandle->data[0] = dup(fd);
639 return nativeHandle;
640 }
641 } // namespace implementation
642 } // namespace V1_0
643 } // namespace tuner
644 } // namespace tv
645 } // namespace hardware
646 } // namespace android
647