add last backwards-compatible version

This commit is contained in:
2021-12-14 00:33:46 -07:00
parent 68b10d413b
commit b0dd3f07f3
335 changed files with 4746 additions and 19627 deletions

View File

@@ -33,17 +33,15 @@ file::ptr fullFileBuffer::open(const char * path, abort_callback & abort, file::
file::ptr f;
if (hint.is_valid()) f = hint;
else filesystem::g_open_read(f, path, abort);
if (sizeMax != filesize_invalid) {
t_filesize fs = f->get_size(abort);
if (fs > sizeMax) return f;
t_filesize fs = f->get_size(abort);
if (fs < sizeMax) /*rejects size-unknown too*/ {
try {
service_ptr_t<reader_bigmem_mirror> r = new service_impl_t<reader_bigmem_mirror>();
r->init(f, abort);
f = r;
}
catch (std::bad_alloc) {}
}
try {
service_ptr_t<reader_bigmem_mirror> r = new service_impl_t<reader_bigmem_mirror>();
r->init(f, abort);
f = r;
}
catch (std::bad_alloc) {}
return f;
}
@@ -68,7 +66,7 @@ namespace {
};
struct readAheadInstance_t {
file::ptr m_file;
size_t m_readAhead, m_wakeUpThreschold;
size_t m_readAhead;
pfc::array_t<uint8_t> m_buffer;
size_t m_bufferBegin, m_bufferEnd;
@@ -78,7 +76,6 @@ namespace {
t_filesize m_seekto;
abort_callback_impl m_abort;
bool m_remote;
bool m_atEOF = false;
bool m_haveDynamicInfo;
std::list<dynInfoEntry_t> m_dynamicInfo;
@@ -107,7 +104,6 @@ namespace {
i->m_file = chain;
i->m_remote = chain->is_remote();
i->m_readAhead = readAhead;
i->m_wakeUpThreschold = readAhead * 3 / 4;
i->m_buffer.set_size_discard( readAhead * 2 );
i->m_bufferBegin = 0; i->m_bufferEnd = 0;
i->m_canWrite.set_state(true);
@@ -122,55 +118,41 @@ namespace {
}
}
fb2k::splitTask( [i] {
#ifdef PFC_SET_THREAD_DESCRIPTION
PFC_SET_THREAD_DESCRIPTION("Fb2k Read-Ahead Thread");
#endif
pfc::splitThread( [i] {
worker(*i);
} );
}
static void waitHelper( pfc::event & evt, abort_callback & aborter ) {
pfc::event::g_twoEventWait( evt.get_handle(), aborter.get_abort_event(), -1);
aborter.check();
}
t_size read(void * p_buffer,t_size p_bytes,abort_callback & p_abort) {
auto & i = * m_instance;
size_t done = 0;
bool initial = true;
while( done < p_bytes ) {
if ( !initial ) {
// Do not invoke waiting with common case read with lots of data in the buffer
pfc::event::g_twoEventWait( i.m_canRead.get_handle(), p_abort.get_abort_event(), -1);
}
p_abort.check();
waitHelper( i.m_canRead, p_abort );
pfc::mutexScope guard ( i.m_guard );
size_t got = i.m_bufferEnd - i.m_bufferBegin;
if (got == 0) {
i.m_error.rethrow();
if ( initial && ! i.m_atEOF ) {
initial = false; continue; // proceed to wait for more data
}
break; // EOF
}
size_t delta = pfc::min_t<size_t>( p_bytes - done, got );
const bool wakeUpBefore = got < i.m_wakeUpThreschold;
auto bufptr = i.m_buffer.get_ptr();
if ( p_buffer != nullptr ) memcpy( (uint8_t*) p_buffer + done, bufptr + i.m_bufferBegin, delta );
memcpy( (uint8_t*) p_buffer + done, bufptr + i.m_bufferBegin, delta );
done += delta;
i.m_bufferBegin += delta;
got -= delta;
m_position += delta;
if (!i.m_error.didFail() && !i.m_atEOF) {
if (!i.m_error.didFail()) {
if ( got == 0 ) i.m_canRead.set_state( false );
const bool wakeUpNow = got < i.m_wakeUpThreschold;
// Only set the event when *crossing* the boundary
// we will get a lot of wakeUpNow when nearing EOF
if ( wakeUpNow && ! wakeUpBefore ) i.m_canWrite.set_state( true );
if ( got < 3 * i.m_readAhead / 4 ) i.m_canWrite.set_state( true );
}
initial = false;
if ( i.m_atEOF ) break; // go no further
}
// FB2K_console_formatter() << "ReadAhead read: " << p_bytes << " => " << done;
return done;
}
t_filesize get_size(abort_callback & p_abort) {
@@ -187,16 +169,6 @@ namespace {
if (!m_canSeek) throw exception_io_object_not_seekable();
if ( m_stats.m_size != filesize_invalid && p_position > m_stats.m_size ) throw exception_io_seek_out_of_range();
auto posNow = get_position(p_abort);
if ( p_position >= posNow && p_position < posNow + m_instance->m_readAhead ) {
// FB2K_console_formatter() << "ReadAhead skip: " << posNow << " => " << p_position;
auto toSkip = p_position - posNow;
if ( toSkip > 0 ) read(nullptr, (size_t) toSkip, p_abort);
return;
}
// FB2K_console_formatter() << "ReadAhead seek: " << posNow << " => " << p_position;
seekInternal( p_position );
}
bool can_seek() {
@@ -220,23 +192,19 @@ namespace {
}
void reopen( abort_callback & p_abort ) {
if ( get_position( p_abort ) == 0 ) return;
p_abort.check();
seekInternal( seek_reopen );
}
bool get_static_info(class file_info & p_out) {
if ( ! m_haveStaticInfo ) return false;
mergeInfo(p_out, m_staticInfo);
p_out = m_staticInfo;
return true;
}
bool is_dynamic_info_enabled() {
return m_instance->m_haveDynamicInfo;
}
static void mergeInfo( file_info & out, const file_info & in ) {
out.copy_meta(in);
out.overwrite_info(in);
}
bool get_dynamic_info_v2(class file_info & out, t_filesize & outOffset) {
auto & i = * m_instance;
if ( ! i.m_haveDynamicInfo ) return false;
@@ -252,10 +220,8 @@ namespace {
if ( ptr == i.m_dynamicInfo.begin() ) return false;
auto iter = ptr; --iter;
mergeInfo(out, iter->m_info);
outOffset = iter->m_offset;
out = iter->m_info; outOffset = iter->m_offset;
i.m_dynamicInfo.erase( i.m_dynamicInfo.begin(), ptr );
return true;
}
private:
@@ -266,7 +232,6 @@ namespace {
i.m_bufferBegin = i.m_bufferEnd = 0;
i.m_canWrite.set_state(true);
i.m_seekto = p_position;
i.m_atEOF = false;
i.m_canRead.set_state(false);
m_position = ( p_position == seek_reopen ) ? 0 : p_position;
@@ -274,9 +239,8 @@ namespace {
static void worker( readAheadInstance_t & i ) {
ThreadUtils::CRethrow err;
err.exec( [&i] {
bool atEOF = false;
uint8_t* bufptr = i.m_buffer.get_ptr();
const size_t readAtOnceLimit = i.m_remote ? 256 : 4*1024;
const size_t readAtOnceLimit = i.m_remote ? 256 : 64*1024;
for ( ;; ) {
i.m_canWrite.wait_for(-1);
size_t readHowMuch = 0, readOffset = 0;
@@ -291,7 +255,6 @@ namespace {
}
i.m_seekto = filesize_invalid;
atEOF = false;
}
size_t got = i.m_bufferEnd - i.m_bufferBegin;
@@ -317,8 +280,6 @@ namespace {
if ( readHowMuch > 0 ) {
readHowMuch = i.m_file->read( bufptr + readOffset, readHowMuch, i.m_abort );
if ( readHowMuch == 0 ) atEOF = true;
if ( i.m_haveDynamicInfo ) {
file_dynamicinfo::ptr dyn;
if ( dyn &= i.m_file ) {
@@ -337,16 +298,14 @@ namespace {
{
pfc::mutexScope guard( i.m_guard );
i.m_abort.check();
if ( i.m_seekto != filesize_invalid ) {
// Seek request happened while we were reading - discard and continue
continue;
}
i.m_atEOF = atEOF;
i.m_canRead.set_state( true );
i.m_bufferEnd += readHowMuch;
size_t got = i.m_bufferEnd - i.m_bufferBegin;
if ( atEOF || got >= i.m_readAhead ) i.m_canWrite.set_state(false);
if ( got >= i.m_readAhead ) i.m_canWrite.set_state(false);
if ( dynInfoGot ) {
i.m_dynamicInfo.push_back( std::move(dynInfo) );