30#include "XrdOuc/XrdOucJson.hh"
39#include <unordered_map>
48const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
54const char *File::m_traceID =
"File";
58File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
62 m_cfi(
Cache::TheOne().GetTrace(),
Cache::TheOne().is_prefetch_enabled()),
65 m_file_size(iFileSize),
66 m_current_io(m_io_set.end()),
70 m_detach_time_logged(false),
76 m_prefetch_state(kOff),
78 m_prefetch_read_cnt(0),
79 m_prefetch_hit_cnt(0),
106 m_info_file->Close();
108 m_info_file =
nullptr;
114 m_data_file->Close();
116 m_data_file =
nullptr;
119 if (m_resmon_token >= 0)
124 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
127 if (sr == 0 && s.st_blocks != m_st_blocks) {
130 m_st_blocks = s.st_blocks;
138 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
145 File *file =
new File(path, offset, fileSize);
146 if ( ! file->Open(inputIO))
171 m_in_shutdown =
true;
173 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
175 m_prefetch_state = kStopped;
176 cache()->DeRegisterPrefetchFile(
this);
179 report_and_merge_delta_stats();
186void File::check_delta_stats()
191 report_and_merge_delta_stats();
194void File::report_and_merge_delta_stats()
198 m_data_file->
Fstat(&s);
201 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
203 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
205 m_st_blocks = st_blocks_to_report;
207 m_stats.
AddUp(m_delta_stats);
208 m_delta_stats.
Reset();
215 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
223 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
227 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
239 insert_remote_location(loc);
255 IoSet_i mi = m_io_set.find(io);
257 if (mi != m_io_set.end())
262 ", active_reads " << n_active_reads <<
263 ", active_prefetches " << io->m_active_prefetches <<
264 ", allow_prefetching " << io->m_allow_prefetching <<
265 ", ios_in_detach " << m_ios_in_detach);
267 "\tio_map.size() " << m_io_set.size() <<
268 ", block_map.size() " << m_block_map.size() <<
", file");
270 insert_remote_location(loc);
272 io->m_allow_prefetching =
false;
273 io->m_in_detach =
true;
276 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
278 if ( ! select_current_io_or_disable_prefetching(
false) )
280 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
287 bool io_active_result;
289 if (n_active_reads > 0)
291 io_active_result =
true;
293 else if (m_io_set.size() - m_ios_in_detach == 1)
295 io_active_result = ! m_block_map.empty();
299 io_active_result = io->m_active_prefetches > 0;
302 if ( ! io_active_result)
307 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
309 return io_active_result;
313 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
324 m_detach_time_logged =
false;
333 if ( ! m_in_shutdown)
335 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
337 report_and_merge_delta_stats();
338 m_cfi.WriteIOStatDetach(m_stats);
339 m_detach_time_logged =
true;
341 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
345 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
357 time_t now = time(0);
362 IoSet_i mi = m_io_set.find(io);
364 if (mi == m_io_set.end())
367 io->m_attach_time = now;
368 m_delta_stats.IoAttach();
370 insert_remote_location(loc);
372 if (m_prefetch_state == kStopped)
374 m_prefetch_state = kOn;
375 cache()->RegisterPrefetchFile(
this);
380 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
383 m_state_cond.UnLock();
394 time_t now = time(0);
398 IoSet_i mi = m_io_set.find(io);
400 if (mi != m_io_set.end())
402 if (mi == m_current_io)
407 m_delta_stats.IoDetach(now - io->m_attach_time);
411 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
413 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
414 m_prefetch_state = kStopped;
415 cache()->DeRegisterPrefetchFile(
this);
420 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
423 m_state_cond.UnLock();
432 static const char *tpfx =
"Open() ";
434 TRACEF(Dump, tpfx <<
"entered");
445 struct stat data_stat, info_stat;
449 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
450 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
453 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
454 myEnv.
Put(
"oss.asize", size_str);
466 m_data_file = myOss.
newFile(myUser);
467 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
471 delete m_data_file; m_data_file = 0;
475 myEnv.
Put(
"oss.asize",
"64k");
481 m_data_file->Close();
delete m_data_file; m_data_file = 0;
485 m_info_file = myOss.
newFile(myUser);
486 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
490 delete m_info_file; m_info_file = 0;
491 m_data_file->Close();
delete m_data_file; m_data_file = 0;
495 bool initialize_info_file =
true;
497 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
499 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
500 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
501 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
502 ", block_size=" << (m_cfi.GetBufferSize() >> 10) <<
"k)");
505 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
507 initialize_info_file =
false;
509 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
510 m_cfi.ResetAllAccessStats();
511 m_data_file->Ftruncate(0);
518 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.
get_cs_Chk())
523 TRACEF(Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
524 initialize_info_file =
true;
525 m_cfi.ResetAllAccessStats();
526 m_data_file->Ftruncate(0);
540 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
543 if (initialize_info_file)
545 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
547 m_cfi.ResetNoCkSumTime();
548 m_cfi.Write(m_info_file, ifn.c_str());
549 m_info_file->Fsync();
550 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
552 if (cache()->RefConfiguration().m_httpcc)
554 std::string responseFctl;
558 std::string cc_str = responseFctl;
559 nlohmann::json cc_json = nlohmann::json::parse(cc_str);
560 if (cc_json.contains(
"max-age"))
562 time_t ma = cc_json[
"max-age"];
564 cc_json[
"expire"] = ma;
565 cc_str = cc_json.dump();
567 TRACE(
Error,
"GetFile() XrdCl::File::Fcntl value " << cc_str);
568 cache()->WriteCacheControlXAttr(m_info_file->getFD(),
nullptr, cc_str);
572 TRACE(
Error,
"GetFile() XrdCl::File::Fcntl query XrdCl::QueryCode::FInfo failed " << inputIO->
Path());
576 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
577 " num blocks = " << m_cfi.GetNBlocks() <<
578 " block size = " << pfc_blocksize);
582 if (futimens(m_info_file->getFD(), NULL)) {
586 TRACEF(Info, tpfx <<
"URL CGI pfc.blocksize ignored for an already existing file");
590 m_cfi.WriteIOStatAttach();
592 m_block_size = m_cfi.GetBufferSize();
593 m_num_blocks = m_cfi.GetNBlocks();
594 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped;
595 m_prefetch_max_blocks_in_flight = pfc_prefetch;
597 TRACEF(
Debug, tpfx <<
"pfc.prefetch set to " << pfc_prefetch <<
" via CGI parameter");
599 m_data_file->Fstat(&data_stat);
600 m_st_blocks = data_stat.st_blocks;
603 constexpr long long MB = 1024 * 1024;
604 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
608 m_state_cond.UnLock();
613void File::parse_pfc_url_args(XrdOucCacheIO* inputIO,
long long &pfc_blocksize,
int &pfc_prefetch)
const
617 XrdCl::URL url(inputIO->
Path());
618 auto const & urlp = url.GetParams();
620 auto extract = [&](
const std::string &key, std::string &value) ->
bool {
621 auto it = urlp.find(key);
622 if (it != urlp.end()) {
634 const char *tpfx =
"File::Open::urlcgi pfc.blocksize ";
636 if (
Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
639 pfc_blocksize = bsize;
641 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
646 const char *tpfx =
"File::Open::urlcgi pfc.prefetch ";
648 if (
Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
653 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
671 if ((res = m_data_file->Fstat(&sbuff)))
return res;
673 sbuff.st_size = m_file_size;
675 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
686bool File::overlap(
int blk,
695 const long long beg = blk * blk_size;
696 const long long end = beg + blk_size;
697 const long long req_end = req_off + req_size;
699 if (req_off < end && req_end > beg)
701 const long long ovlp_beg = std::max(beg, req_off);
702 const long long ovlp_end = std::min(end, req_end);
704 off = ovlp_beg - req_off;
705 blk_off = ovlp_beg - beg;
706 size = (int) (ovlp_end - ovlp_beg);
708 assert(size <= blk_size);
719Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
727 const long long off = i * m_block_size;
728 const int last_block = m_num_blocks - 1;
729 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
731 int blk_size, req_size;
732 if (i == last_block) {
733 blk_size = req_size = m_file_size - off;
734 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
736 blk_size = req_size = m_block_size;
740 char *buf = cache()->RequestRAM(req_size);
744 b =
new (std::nothrow) Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
752 if (m_prefetch_state == kOn && (
int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
754 m_prefetch_state = kHold;
755 cache()->DeRegisterPrefetchFile(
this);
760 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
767void File::ProcessBlockRequest(
Block *b)
775 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
776 b->
get_offset()/m_block_size, (
void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (
void*)b->get_buff(), (
void*)brh);
777 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
793 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
795 ProcessBlockRequest(*bi);
801void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
803 int n_chunks = ioVec.size();
806 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
807 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
817 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
822int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
824 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
826 long long rs = m_data_file->ReadV(ioVec.data(), (
int) ioVec.size());
830 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
834 if (rs != expected_size)
836 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
855 if (m_in_shutdown || io->m_in_detach)
857 m_state_cond.UnLock();
858 return m_in_shutdown ? -ENOENT : -EBADF;
863 if (m_cfi.IsComplete())
865 m_state_cond.UnLock();
866 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
869 m_delta_stats.AddBytesHit(ret);
875 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
877 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
884 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
888 if (m_in_shutdown || io->m_in_detach)
890 m_state_cond.UnLock();
891 return m_in_shutdown ? -ENOENT : -EBADF;
896 if (m_cfi.IsComplete())
898 m_state_cond.UnLock();
899 int ret = m_data_file->ReadV(
const_cast<XrdOucIOVec*
>(readV), readVnum);
902 m_delta_stats.AddBytesHit(ret);
908 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
913int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
925 int prefetch_cnt = 0;
930 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
932 std::vector<XrdOucIOVec> iovec_disk;
933 std::vector<XrdOucIOVec> iovec_direct;
934 int iovec_disk_total = 0;
935 int iovec_direct_total = 0;
937 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
944 const int idx_first = iUserOff / m_block_size;
945 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
947 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
949 enum LastBlock_e { LB_other, LB_disk, LB_direct };
951 LastBlock_e lbe = LB_other;
953 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
956 BlockMap_i bi = m_block_map.find(block_idx);
963 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
966 if (bi != m_block_map.end())
968 inc_ref_count(bi->second);
969 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
971 if (bi->second->is_finished())
975 assert(bi->second->is_ok());
977 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
979 if (bi->second->m_prefetch)
985 read_req =
new ReadRequest(io, rh);
990 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
997 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
999 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
1002 iovec_disk.back().size += size;
1004 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
1005 iovec_disk_total += size;
1007 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
1016 read_req =
new ReadRequest(io, rh);
1019 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
1022 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
1024 blks_to_request.push_back(b);
1026 b->
m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
1033 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
1035 iovec_direct_total += size;
1042 iovec_direct.back().size += size;
1044 long long in_offset = block_idx * m_block_size + blk_off;
1045 char *out_pos = iUserBuff + off;
1052 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1061 inc_prefetch_hit_cnt(prefetch_cnt);
1063 m_state_cond.UnLock();
1066 if ( ! blks_to_request.empty())
1068 ProcessBlockRequests(blks_to_request);
1069 blks_to_request.clear();
1073 if ( ! iovec_direct.empty())
1075 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1077 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
1082 long long bytes_read = 0;
1086 if ( ! blks_ready.empty())
1088 for (
auto &bvi : blks_ready)
1090 for (
auto &cr : bvi.second)
1092 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
1093 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1094 bytes_read += cr.m_size;
1100 if ( ! iovec_disk.empty())
1102 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1103 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1118 m_state_cond.Lock();
1120 for (
auto &bvi : blks_ready)
1121 dec_ref_count(bvi.first, (
int) bvi.second.size());
1134 m_delta_stats.AddReadStats(read_req->
m_stats);
1135 check_delta_stats();
1136 m_state_cond.UnLock();
1144 m_state_cond.UnLock();
1145 return -EWOULDBLOCK;
1150 m_delta_stats.m_BytesHit += bytes_read;
1151 check_delta_stats();
1152 m_state_cond.UnLock();
1156 return error_cond ? error_cond : bytes_read;
1168 long long offset = b->
m_offset - m_offset;
1172 if (m_cfi.IsCkSumCache())
1176 retval = m_data_file->pgWrite(b->
get_buff(), offset, size, 0, 0);
1178 retval = m_data_file->Write(b->
get_buff(), offset, size);
1183 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1185 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1195 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1198 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1200 bool schedule_sync =
false;
1204 m_cfi.SetBitWritten(blk_idx);
1208 m_cfi.SetBitPrefetch(blk_idx);
1212 m_cfi.ResetCkSumNet();
1219 m_writes_during_sync.push_back(blk_idx);
1223 m_cfi.SetBitSynced(blk_idx);
1224 ++m_non_flushed_cnt;
1225 if ((m_cfi.IsComplete() || m_non_flushed_cnt >=
Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1228 schedule_sync =
true;
1230 m_non_flushed_cnt = 0;
1236 if (!schedule_sync) {
1243 cache()->ScheduleFileSync(
this);
1255 int ret = m_data_file->Fsync();
1256 bool errorp =
false;
1262 report_and_merge_delta_stats();
1263 loc_stats = m_stats;
1265 m_cfi.WriteIOStat(loc_stats);
1266 m_cfi.Write(m_info_file, m_filename.c_str());
1267 int cret = m_info_file->Fsync();
1270 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1276 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1282 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1289 m_writes_during_sync.clear();
1295 int written_while_in_sync;
1296 bool resync =
false;
1299 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1301 m_cfi.SetBitSynced(*i);
1303 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1304 m_writes_during_sync.clear();
1308 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1313 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1324void File::free_block(
Block* b)
1327 int i = b->
m_offset / m_block_size;
1328 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1329 size_t ret = m_block_map.erase(i);
1333 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1341 if (m_prefetch_state == kHold && (
int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1343 m_prefetch_state = kOn;
1344 cache()->RegisterPrefetchFile(
this);
1350bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1354 int io_size = (int) m_io_set.size();
1359 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1362 m_current_io = m_io_set.begin();
1365 else if (io_size > 1)
1367 IoSet_i mi = m_current_io;
1368 if (skip_current && mi != m_io_set.end()) ++mi;
1370 for (
int i = 0; i < io_size; ++i)
1372 if (mi == m_io_set.end()) mi = m_io_set.begin();
1374 if ((*mi)->m_allow_prefetching)
1386 m_current_io = m_io_set.end();
1387 m_prefetch_state = kStopped;
1388 cache()->DeRegisterPrefetchFile(
this);
1396void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1402 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1404 m_state_cond.Lock();
1417 m_state_cond.UnLock();
1420 FinalizeReadRequest(rreq);
1447 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1450 m_state_cond.Lock();
1455 rreq->m_stats.m_BytesMissed += creq.
m_size;
1457 rreq->m_stats.m_BytesHit += creq.
m_size;
1459 --rreq->m_n_chunk_reqs;
1462 inc_prefetch_hit_cnt(1);
1466 bool rreq_complete = rreq->is_complete();
1468 m_state_cond.UnLock();
1471 FinalizeReadRequest(rreq);
1479 XrdSysCondVarHelper _lck(m_state_cond);
1480 m_delta_stats.AddReadStats(rreq->
m_stats);
1481 check_delta_stats();
1488void File::ProcessBlockResponse(
Block *b,
int res)
1490 static const char* tpfx =
"ProcessBlockResponse ";
1492 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1494 if (res >= 0 && res != b->
get_size())
1498 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1502 m_state_cond.Lock();
1508 IoSet_i mi = m_io_set.find(io);
1509 if (mi != m_io_set.end())
1511 --io->m_active_prefetches;
1514 if (res < 0 && io->m_allow_prefetching)
1516 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1517 io->m_allow_prefetching =
false;
1520 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1522 if ( ! select_current_io_or_disable_prefetching(
false) )
1524 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1530 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1533 m_state_cond.UnLock();
1547 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1548 if ( ! m_in_shutdown)
1554 cache()->AddWriteTask(b,
true);
1561 m_state_cond.UnLock();
1563 for (
auto &creq : creqs_to_notify)
1565 ProcessBlockSuccess(b, creq);
1574 <<
", io=" << b->
get_io() <<
", error=" << res);
1579 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1580#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1591 std::list<ReadRequest*> rreqs_to_complete;
1600 ProcessBlockError(b, rreq);
1603 rreqs_to_complete.push_back(rreq);
1608 creqs_to_keep.push_back(creq);
1612 bool reissue =
false;
1613 if ( ! creqs_to_keep.empty())
1615 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1617 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1618 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1625 m_state_cond.UnLock();
1627 for (
auto rreq : rreqs_to_complete)
1628 FinalizeReadRequest(rreq);
1631 ProcessBlockRequest(b);
1639 return m_filename.c_str();
1644int File::offsetIdx(
int iIdx)
const
1646 return iIdx - m_offset/m_block_size;
1660 TRACEF(DumpXL,
"Prefetch() entering.");
1664 if (m_prefetch_state != kOn)
1669 if ( ! select_current_io_or_disable_prefetching(
true) )
1671 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1676 for (
int f = 0; f < m_num_blocks; ++f)
1678 if ( ! m_cfi.TestBitWritten(f))
1680 int f_act = f + m_offset / m_block_size;
1682 BlockMap_i bi = m_block_map.find(f_act);
1683 if (bi == m_block_map.end())
1685 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1688 TRACEF(Dump,
"Prefetch take block " << f_act);
1692 inc_prefetch_read_cnt(1);
1697 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1706 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1707 m_prefetch_state = kComplete;
1708 cache()->DeRegisterPrefetchFile(
this);
1712 (*m_current_io)->m_active_prefetches += (int) blks.size();
1716 if ( ! blks.empty())
1718 ProcessBlockRequests(blks);
1727 return m_prefetch_score;
1740void File::insert_remote_location(
const std::string &loc)
1744 size_t p = loc.find_first_of(
'@');
1745 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1752 if ( ! m_remote_locations.empty())
1756 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1760 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1763 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1765 s +=
'"'; s += *i; s +=
'"';
1766 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Fstat(struct stat *buf)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int Fcntl(XrdOucCacheOp::Code opc, const std::string &args, std::string &resp)
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
XrdSysError * GetLog() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
XrdSysTrace * GetTrace() const
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
static const Cache & TheOne()
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
cache block size, default 128 kB
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
std::string m_meta_space
oss space for metadata files (cinfo)
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
std::string m_username
username passed to oss plugin
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
void update_error_cond(int ec)