22#include <sys/statvfs.h>
31#include "XrdOuc/XrdOucJson.hh"
54Cache *Cache::m_instance =
nullptr;
81 const char *config_filename,
82 const char *parameters,
86 err.
Say(
"++++++ Proxy file cache initialization started.");
97 if (! instance.
Config(config_filename, parameters, env))
99 err.
Say(
"Config Proxy file cache initialization failed.");
102 err.
Say(
"++++++ Proxy file cache initialization completed.");
109 for (
int wti = 0; wti < instance.
RefConfiguration().m_wqueue_threads; ++wti)
121 env->
PutPtr(
"XrdFSCtl_PC*", pfcFSctl);
131 assert (m_instance == 0);
132 m_instance =
new Cache(logger, env);
143 if (! m_decisionpoints.empty())
146 std::string filename = url.
GetPath();
147 std::vector<Decision*>::const_iterator it;
148 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
152 if (! d->
Decide(filename, *m_oss))
165 m_log(logger,
"XrdPfc_"),
171 m_prefetch_condVar(0),
172 m_prefetch_enabled(false),
174 m_RAM_write_queue(0),
185 const char* tpfx =
"Attach() ";
210 TRACE(
Error, tpfx <<
"Failed opening local file, falling back to remote access " << io->
Path());
218 ((loc && loc[0] != 0) ? loc :
"<deferred open>"));
238 m_writeQ.condVar.Lock();
240 m_writeQ.queue.push_back(b);
242 m_writeQ.queue.push_front(b);
244 m_writeQ.condVar.Signal();
245 m_writeQ.condVar.UnLock();
250 std::list<Block*> removed_blocks;
251 long long sum_size = 0;
253 m_writeQ.condVar.Lock();
254 std::list<Block*>::iterator i = m_writeQ.queue.begin();
255 while (i != m_writeQ.queue.end())
257 if ((*i)->m_file == file)
259 TRACE(Dump,
"Remove entries for " << (
void*)(*i) <<
" path " << file->
lPath());
260 std::list<Block*>::iterator j = i++;
261 removed_blocks.push_back(*j);
262 sum_size += (*j)->get_size();
263 m_writeQ.queue.erase(j);
271 m_writeQ.condVar.UnLock();
275 m_RAM_write_queue -= sum_size;
283 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
287 m_writeQ.condVar.Lock();
288 while (m_writeQ.size == 0)
290 m_writeQ.condVar.Wait();
296 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
297 long long sum_size = 0;
299 for (
int bi = 0; bi < n_pushed; ++bi)
301 Block* block = m_writeQ.queue.front();
302 m_writeQ.queue.pop_front();
303 m_writeQ.writes_between_purges += block->
get_size();
306 blks_to_write[bi] = block;
308 TRACE(Dump,
"ProcessWriteTasks for block " << (
void*)(block) <<
" path " << block->
m_file->
lPath());
310 m_writeQ.size -= n_pushed;
312 m_writeQ.condVar.UnLock();
316 m_RAM_write_queue -= sum_size;
319 for (
int bi = 0; bi < n_pushed; ++bi)
321 Block* block = blks_to_write[bi];
332 long long ret = m_writeQ.writes_between_purges;
333 m_writeQ.writes_between_purges = 0;
341 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
343 bool std_size = (size == m_configuration.m_bufferSize);
347 long long total = m_RAM_used + size;
349 if (total <= m_configuration.m_RamAbsAvailable)
352 if (std_size && m_RAM_std_size > 0)
354 char *buf = m_RAM_std_blocks.back();
355 m_RAM_std_blocks.pop_back();
358 m_RAM_mutex.UnLock();
364 m_RAM_mutex.UnLock();
366 if (posix_memalign((
void**) &buf, s_block_align, (
size_t) size))
375 m_RAM_mutex.UnLock();
381 bool std_size = (size == m_configuration.m_bufferSize);
387 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
389 m_RAM_std_blocks.push_back(buf);
401 TRACE(
Debug,
"GetFile " << path <<
", io " << io);
410 it = m_active.find(path);
414 if (it == m_active.end())
416 it = m_active.insert(std::make_pair(path, (
File*) 0)).first;
422 it->second->AddIO(io);
423 inc_ref_cnt(it->second,
false,
true);
430 m_active_cond.Wait();
440 int res = io->
Fstat(st);
443 TRACE(
Error,
"GetFile, could not get valid stat");
444 }
else if (res > 0) {
446 TRACE(
Error,
"GetFile, stat returned positive value, this should NOT happen here");
448 filesize = st.st_size;
464 inc_ref_cnt(file,
false,
true);
474 m_active_cond.Broadcast();
491 dec_ref_cnt(f,
true);
498class DiskSyncer :
public XrdJob
505 DiskSyncer(
File *f,
bool high_debug,
const char *desc =
"") :
508 m_high_debug(high_debug)
520class CommandExecutor :
public XrdJob
523 std::string m_command_url;
526 CommandExecutor(
const std::string& command,
const char *desc =
"") :
528 m_command_url(command)
542void Cache::schedule_file_sync(
File* f,
bool ref_cnt_already_set,
bool high_debug)
544 DiskSyncer* ds =
new DiskSyncer(f, high_debug);
546 if ( ! ref_cnt_already_set) inc_ref_cnt(f,
true, high_debug);
553 dec_ref_cnt(f, high_debug);
556void Cache::inc_ref_cnt(
File* f,
bool lock,
bool high_debug)
562 if (lock) m_active_cond.
Lock();
564 if (lock) m_active_cond.
UnLock();
569void Cache::dec_ref_cnt(
File* f,
bool high_debug)
577 bool emergency_close =
false;
592 <<
" -- closing and deleting File object without further ado");
593 emergency_close =
true;
624 schedule_file_sync(f,
true,
true);
629 bool finished_p =
false;
632 XrdSysCondVarHelper lock(&m_active_cond);
635 TRACE_INT(tlvl,
"dec_ref_cnt " << f->
GetLocalPath() <<
", cnt after sync_check and dec_ref_cnt = " << cnt);
648 XrdSysCondVarHelper lock(&m_active_cond);
649 m_active.erase(act_it);
650 m_active_cond.Broadcast();
659 int len = snprintf(buf, 4096,
"{\"event\":\"file_close\","
660 "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
661 "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
662 "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,"
663 "\"b_todisk\":%lld,\"b_prefetch\":%lld,\"n_cks_errs\":%d}",
674 suc = m_gstream->Insert(buf, len + 1);
678 TRACE(
Error,
"Failed g-stream insertion of file_close record, len=" << len);
690 return m_active.find(path) != m_active.end() ||
691 m_purge_delay_set.find(path) != m_purge_delay_set.end();
697 m_purge_delay_set.clear();
708 if ( ! m_prefetch_enabled)
713 m_prefetch_condVar.Lock();
714 m_prefetchList.push_back(file);
715 m_prefetch_condVar.Signal();
716 m_prefetch_condVar.UnLock();
724 if ( ! m_prefetch_enabled)
729 m_prefetch_condVar.Lock();
730 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
734 m_prefetchList.erase(it);
738 m_prefetch_condVar.UnLock();
744 m_prefetch_condVar.Lock();
745 while (m_prefetchList.empty())
747 m_prefetch_condVar.Wait();
752 size_t l = m_prefetchList.size();
753 int idx = rand() % l;
754 File* f = m_prefetchList[idx];
756 m_prefetch_condVar.UnLock();
763 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
768 bool doPrefetch = (m_RAM_used < limit_RAM);
769 m_RAM_mutex.UnLock();
806 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
807 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
808 static const char *lfpReason[] = {
"ForAccess",
"ForInfo",
"ForPath" };
810 TRACE(
Debug,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why]);
812 if (buff && blen > 0) buff[0] = 0;
815 std::string f_name = url.
GetPath();
820 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
821 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
" -> " << ret);
827 m_purge_delay_set.insert(f_name);
830 struct stat sbuff, sbuff2;
831 if (m_oss->Stat(f_name.c_str(), &sbuff) ==
XrdOssOK &&
832 m_oss->Stat(i_name.c_str(), &sbuff2) ==
XrdOssOK)
834 if (S_ISDIR(sbuff.st_mode))
836 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
" -> EISDIR");
841 bool read_ok =
false;
842 bool is_complete =
false;
854 m_active_cond.Lock();
856 bool is_active = m_active.find(f_name) != m_active.end();
858 if (is_active) m_active_cond.UnLock();
860 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
862 int res = infoFile->
Open(i_name.c_str(), O_RDWR, 0600, myEnv);
865 Info info(m_trace, 0);
866 if (info.
Read(infoFile, i_name.c_str()))
873 if ( ! is_active && is_complete && why ==
ForAccess)
876 info.
Write(infoFile, i_name.c_str());
883 if ( ! is_active) m_active_cond.UnLock();
887 if ((is_complete || why ==
ForInfo) && buff != 0)
889 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
896 {mode_t mode = (forall ? worldReadable : groupReadable);
897 if (((sbuff.st_mode & worldReadable) != mode)
898 && (m_oss->Chmod(f_name.c_str(), mode) !=
XrdOssOK))
899 {is_complete =
false;
905 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
906 (is_complete ?
" -> FILE_COMPLETE_IN_CACHE" :
" -> EREMOTE"));
908 return is_complete ? 0 : -EREMOTE;
913 TRACE(
Info,
"LocalFilePath '" << curl <<
"', why=" << lfpReason[why] <<
" -> ENOENT");
924 int res =
XrdSysXAttrActive->Set(
"pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0);
926 TRACE(
Error,
"WritecacheControlXAttr error setting xattr " << res);
937 int res =
XrdSysXAttrActive->Set(
"pfc.fsize", &file_size,
sizeof(
long long), 0, cinfo_fd, 0);
939 TRACE(
Debug,
"WriteFileSizeXAttr error setting xattr " << res);
953 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
954 long long fsize = -1ll;
956 if (res ==
sizeof(
long long))
962 TRACE(
Debug,
"DetermineFullFileSize error getting xattr " << res);
966 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
969 int res = infoFile->
Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
973 Info info(m_trace, 0);
974 if ( ! info.
Read(infoFile, cinfo_fname.c_str())) {
994 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
1000 std::string tmp(cc, res);
1019 ival = std::string(cc, res);
1033 if (file_size == 0 || bytes_on_disk >= file_size)
1036 double frac_on_disk = (double) bytes_on_disk / file_size;
1038 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
1040 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1045 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
1046 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1066 static const char* tpfx =
"ConsiderCached ";
1071 std::string f_name = url.
GetPath();
1073 File *file =
nullptr;
1076 auto it = m_active.find(f_name);
1077 if (it != m_active.end()) {
1084 inc_ref_cnt(file,
false,
false);
1090 int res = file->
Fstat(sbuff);
1091 dec_ref_cnt(file,
false);
1095 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1099 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1104 if (S_ISDIR(sbuff.st_mode))
1111 if (file_size < 0) {
1112 TRACE(
Debug, tpfx << curl <<
" -> " << file_size);
1113 return (
int) file_size;
1117 return is_cached ? 0 : -EREMOTE;
1133 std::string f_name = url.
GetPath();
1137 if ((oflags & O_ACCMODE) != O_RDONLY)
1143 TRACE(Warning,
"Prepare write access requested on file " << f_name <<
". Denying access.");
1148 if (m_configuration.m_allow_xrdpfc_command && strncmp(
"/xrdpfc_command/", f_name.c_str(), 16) == 0)
1152 CommandExecutor *ce =
new CommandExecutor(f_name,
"CommandExecutor");
1162 m_purge_delay_set.insert(f_name);
1166 if (m_oss->Stat(i_name.c_str(), &sbuff) ==
XrdOssOK)
1169 if (m_configuration.m_httpcc && !is_http_cache_valid(f_name, i_name, url))
1171 TRACE(
Info,
"Http cache not valid " << f_name);
1176 TRACE(Dump,
"Prepare defer open " << f_name);
1195 const char *tpfx =
"Stat ";
1198 std::string f_name = url.
GetPath();
1200 File *file =
nullptr;
1203 auto it = m_active.find(f_name);
1204 if (it != m_active.end()) {
1210 inc_ref_cnt(file,
false,
false);
1215 int res = file->
Fstat(sbuff);
1216 dec_ref_cnt(file,
false);
1217 TRACE(
Debug, tpfx <<
"from active file " << curl <<
" -> " << res);
1221 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1226 if (S_ISDIR(sbuff.st_mode))
1233 if (file_size < 0) {
1234 TRACE(
Debug, tpfx << curl <<
" -> " << file_size);
1237 sbuff.st_size = file_size;
1242 TRACE(
Debug, tpfx <<
"from disk " << curl <<
" -> " << res);
1257 std::string f_name = url.
GetPath();
1266 static const char* trc_pfx =
"UnlinkFile ";
1269 long long st_blocks_to_purge = 0;
1273 it = m_active.find(f_name);
1275 if (it != m_active.end())
1279 TRACE(
Info, trc_pfx << f_name <<
", file currently open and force not requested - denying request");
1285 if (it->second == 0)
1287 TRACE(
Info, trc_pfx << f_name <<
", an operation on this file is ongoing - denying request");
1297 it = m_active.insert(std::make_pair(f_name, (
File*) 0)).first;
1305 if (m_oss->Stat(f_name.c_str(), &f_stat) ==
XrdOssOK)
1306 st_blocks_to_purge = f_stat.st_blocks;
1312 int f_ret = m_oss->Unlink(f_name.c_str());
1313 int i_ret = m_oss->Unlink(i_name.c_str());
1315 if (st_blocks_to_purge)
1316 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1318 TRACE(
Debug, trc_pfx << f_name <<
", f_ret=" << f_ret <<
", i_ret=" << i_ret);
1323 m_active_cond.Broadcast();
1326 return std::min(f_ret, i_ret);
1334bool Cache::is_http_cache_valid(
const std::string& f_name,
const std::string& i_name,
XrdCl::URL& url)
1342 bool ccIsValid =
true;
1343 using namespace nlohmann;
1344 json cc_json = json::parse(icc);
1346 bool mustRevalidate = cc_json.contains(
"revalidate") && (cc_json[
"revalidate"] ==
true);
1347 bool hasExpired =
false;
1348 if (cc_json.contains(
"expire"))
1350 time_t current_time;
1351 current_time = time(NULL);
1352 if (current_time > cc_json[
"expire"])
1356 if (cc_json.contains(
"ETag") && (mustRevalidate || hasExpired))
1361 std::string response;
1362 std::string fsctlarg = url.
GetURL();
1368 std::string etag = response.substr(0, response.find(
'\0'));
1369 std::string jval = cc_json[
"ETag"].get<std::string>();
1370 ccIsValid = (etag == jval);
1372 TRACE(
Info,
"Prepare " << i_name <<
", ETag valid res: " << ccIsValid);
1375 if (cc_json.contains(
"max-age"))
1377 time_t ma = cc_json[
"max-age"];
1378 cc_json[
"expire"] = ma + time(NULL);
1380 m_oss->
Lfn2Pfn(i_name.c_str(), pfn, 4096);
1387 TRACE(
Error,
"Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str());
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
#define TRACE_INT(act, x)
void * ProcessWriteTaskThread(void *)
XrdSysXAttr * XrdSysXAttrActive
void * ResourceMonitorThread(void *)
void * PrefetchThread(void *)
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
const std::string & GetPath() const
Get the path.
std::string GetURL() const
Get the URL.
void SetParam(const std::string &name, const std::string &value)
Set a single param.
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
static const int optRW
File is read/write (o/w read/only).
XrdOucCache(const char *ctype)
void * GetPtr(const char *varname)
void PutPtr(const char *varname, void *value)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
long long DetermineFullFileSize(const std::string &cinfo_fname)
void FileSyncDone(File *, bool high_debug)
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
static const Configuration & Conf()
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
virtual int Stat(const char *url, struct stat &sbuff)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
bool IsFileActiveOrPurgeProtected(const std::string &) const
void ClearPurgeProtectedSet()
void ReleaseRAM(char *buf, long long size)
virtual int ConsiderCached(const char *url)
static Cache & GetInstance()
Singleton access.
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
int GetCacheControlXAttr(const std::string &cinfo_fname, std::string &res) const
void DeRegisterPrefetchFile(File *)
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
void WriteFileSizeXAttr(int cinfo_fd, long long file_size)
void ReleaseFile(File *, IO *)
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
static XrdScheduler * schedP
File * GetNextFileToPrefetch()
long long WritesSinceLastCall()
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
virtual int Unlink(const char *url)
void WriteCacheControlXAttr(int cinfo_fd, const char *path, const std::string &cc)
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *, int Options=0)
static const Cache & TheOne()
char * RequestRAM(long long size)
virtual int Prepare(const char *url, int oflags, mode_t mode)
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
bool is_prefetch_enabled() const
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
long long GetPrefetchedBytes() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
long long initiate_emergency_shutdown()
long long GetFileSize() const
const std::string & GetLocalPath() const
bool is_in_emergency_shutdown()
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
XrdOucCacheIO * GetInput()
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
bool IsComplete() const
Get complete status.
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void main_thread_function()
int m_NCksumErrors
number of checksum errors while getting data from remote
long long m_BytesWritten
number of bytes written to disk
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Contains parameters configurable from the xrootd config file.
bool m_qfsredir
redirect file system query to the origin
long long BytesHit
read from cache
long long BytesBypassed
read from remote and dropped
time_t DetachTime
close time
long long BytesMissed
read from remote and cached
time_t AttachTime
open time