Mercurial > repo
view share/lua/5.2/luarocks/fs/lua.lua @ 12518:2d8fe55c6e65 draft default tip
<int-e> learn The password of the month is release incident pilot.
author | HackEso <hackeso@esolangs.org> |
---|---|
date | Sun, 03 Nov 2024 00:31:02 +0000 |
parents | d137f631bad5 |
children |
line wrap: on
line source
--- Native Lua implementation of filesystem and platform abstractions, -- using LuaFileSystem, LZLib, MD5 and LuaCurl. module("luarocks.fs.lua", package.seeall) local fs = require("luarocks.fs") local cfg = require("luarocks.cfg") local dir = require("luarocks.dir") local util = require("luarocks.util") local path = require("luarocks.path") local socket_ok, zip_ok, unzip_ok, lfs_ok, md5_ok, posix_ok, _ local http, ftp, lrzip, luazip, lfs, md5, posix if cfg.fs_use_modules then socket_ok, http = pcall(require, "socket.http") _, ftp = pcall(require, "socket.ftp") zip_ok, lrzip = pcall(require, "luarocks.tools.zip") unzip_ok, luazip = pcall(require, "zip"); _G.zip = nil lfs_ok, lfs = pcall(require, "lfs") md5_ok, md5 = pcall(require, "md5") posix_ok, posix = pcall(require, "posix") end local patch = require("luarocks.tools.patch") local dir_stack = {} math.randomseed(os.time()) dir_separator = "/" --- Quote argument for shell processing. -- Adds single quotes and escapes. -- @param arg string: Unquoted argument. -- @return string: Quoted argument. function Q(arg) assert(type(arg) == "string") -- FIXME Unix-specific return "'" .. arg:gsub("\\", "\\\\"):gsub("'", "'\\''") .. "'" end --- Test is file/dir is writable. -- Warning: testing if a file/dir is writable does not guarantee -- that it will remain writable and therefore it is no replacement -- for checking the result of subsequent operations. -- @param file string: filename to test -- @return boolean: true if file exists, false otherwise. function is_writable(file) assert(file) file = dir.normalize(file) local result if fs.is_dir(file) then local file2 = dir.path(file, '.tmpluarockstestwritable') local fh = io.open(file2, 'wb') result = fh ~= nil if fh then fh:close() end os.remove(file2) else local fh = io.open(file, 'r+b') result = fh ~= nil if fh then fh:close() end end return result end --- Create a temporary directory. -- @param name string: name pattern to use for avoiding conflicts -- when creating temporary directory. -- @return string or nil: name of temporary directory or nil on failure. function make_temp_dir(name) assert(type(name) == "string") name = dir.normalize(name) local temp_dir = (os.getenv("TMP") or "/tmp") .. "/luarocks_" .. name:gsub(dir.separator, "_") .. "-" .. tostring(math.floor(math.random() * 10000)) if fs.make_dir(temp_dir) then return temp_dir else return nil end end --- Run the given command, quoting its arguments. -- The command is executed in the current directory in the dir stack. -- @param command string: The command to be executed. No quoting/escaping -- is applied. -- @param ... Strings containing additional arguments, which are quoted. -- @return boolean: true if command succeeds (status code 0), false -- otherwise. function execute(command, ...) assert(type(command) == "string") for _, arg in ipairs({...}) do assert(type(arg) == "string") command = command .. " " .. fs.Q(arg) end return fs.execute_string(command) end --- Check the MD5 checksum for a file. -- @param file string: The file to be checked. -- @param md5sum string: The string with the expected MD5 checksum. -- @return boolean: true if the MD5 checksum for 'file' equals 'md5sum', false if not -- or if it could not perform the check for any reason. function check_md5(file, md5sum) file = dir.normalize(file) local computed = fs.get_md5(file) if not computed then return false end if computed:match("^"..md5sum) then return true else return false end end --------------------------------------------------------------------- -- LuaFileSystem functions --------------------------------------------------------------------- if lfs_ok then --- Run the given command. -- The command is executed in the current directory in the dir stack. -- @param cmd string: No quoting/escaping is applied to the command. -- @return boolean: true if command succeeds (status code 0), false -- otherwise. function execute_string(cmd) local code = os.execute(cmd) if code == 0 or code == true then return true else return false end end --- Obtain current directory. -- Uses the module's internal dir stack. -- @return string: the absolute pathname of the current directory. function current_dir() return lfs.currentdir() end --- Change the current directory. -- Uses the module's internal dir stack. This does not have exact -- semantics of chdir, as it does not handle errors the same way, -- but works well for our purposes for now. -- @param d string: The directory to switch to. function change_dir(d) table.insert(dir_stack, lfs.currentdir()) d = dir.normalize(d) lfs.chdir(d) end --- Change directory to root. -- Allows leaving a directory (e.g. for deleting it) in -- a crossplatform way. function change_dir_to_root() table.insert(dir_stack, lfs.currentdir()) lfs.chdir("/") -- works on Windows too end --- Change working directory to the previous in the dir stack. -- @return true if a pop ocurred, false if the stack was empty. function pop_dir() local d = table.remove(dir_stack) if d then lfs.chdir(d) return true else return false end end --- Create a directory if it does not already exist. -- If any of the higher levels in the path name does not exist -- too, they are created as well. -- @param directory string: pathname of directory to create. -- @return boolean: true on success, false on failure. function make_dir(directory) assert(type(directory) == "string") directory = dir.normalize(directory) local path = nil if directory:sub(2, 2) == ":" then path = directory:sub(1, 2) directory = directory:sub(4) else if directory:match("^/") then path = "" end end for d in directory:gmatch("([^"..dir.separator.."]+)"..dir.separator.."*") do path = path and path .. dir.separator .. d or d local mode = lfs.attributes(path, "mode") if not mode then if not lfs.mkdir(path) then return false end elseif mode ~= "directory" then return false end end return true end --- Remove a directory if it is empty. -- Does not return errors (for example, if directory is not empty or -- if already does not exist) -- @param d string: pathname of directory to remove. function remove_dir_if_empty(d) assert(d) d = dir.normalize(d) lfs.rmdir(d) end --- Remove a directory if it is empty. -- Does not return errors (for example, if directory is not empty or -- if already does not exist) -- @param d string: pathname of directory to remove. function remove_dir_tree_if_empty(d) assert(d) d = dir.normalize(d) for i=1,10 do lfs.rmdir(d) d = dir.dir_name(d) end end --- Copy a file. -- @param src string: Pathname of source -- @param dest string: Pathname of destination -- @param perms string or nil: Permissions for destination file, -- or nil to use the source filename permissions -- @return boolean or (boolean, string): true on success, false on failure, -- plus an error message. function copy(src, dest, perms) assert(src and dest) src = dir.normalize(src) dest = dir.normalize(dest) local destmode = lfs.attributes(dest, "mode") if destmode == "directory" then dest = dir.path(dest, dir.base_name(src)) end if not perms then perms = fs.get_permissions(src) end local src_h, err = io.open(src, "rb") if not src_h then return nil, err end local dest_h, err = io.open(dest, "w+b") if not dest_h then src_h:close() return nil, err end while true do local block = src_h:read(8192) if not block then break end dest_h:write(block) end src_h:close() dest_h:close() fs.chmod(dest, perms) return true end --- Implementation function for recursive copy of directory contents. -- Assumes paths are normalized. -- @param src string: Pathname of source -- @param dest string: Pathname of destination -- @return boolean or (boolean, string): true on success, false on failure local function recursive_copy(src, dest) local srcmode = lfs.attributes(src, "mode") if srcmode == "file" then local ok = fs.copy(src, dest) if not ok then return false end elseif srcmode == "directory" then local subdir = dir.path(dest, dir.base_name(src)) fs.make_dir(subdir) for file in lfs.dir(src) do if file ~= "." and file ~= ".." then local ok = recursive_copy(dir.path(src, file), subdir) if not ok then return false end end end end return true end --- Recursively copy the contents of a directory. -- @param src string: Pathname of source -- @param dest string: Pathname of destination -- @return boolean or (boolean, string): true on success, false on failure, -- plus an error message. function copy_contents(src, dest) assert(src and dest) src = dir.normalize(src) dest = dir.normalize(dest) assert(lfs.attributes(src, "mode") == "directory") for file in lfs.dir(src) do if file ~= "." and file ~= ".." then local ok = recursive_copy(dir.path(src, file), dest) if not ok then return false, "Failed copying "..src.." to "..dest end end end return true end --- Implementation function for recursive removal of directories. -- Assumes paths are normalized. -- @param name string: Pathname of file -- @return boolean or (boolean, string): true on success, -- or nil and an error message on failure. local function recursive_delete(name) local mode = lfs.attributes(name, "mode") if mode == "file" then return os.remove(name) elseif mode == "directory" then for file in lfs.dir(name) do if file ~= "." and file ~= ".." then local ok, err = recursive_delete(dir.path(name, file)) if not ok then return nil, err end end end local ok, err = lfs.rmdir(name) if not ok then return nil, err end end return true end --- Delete a file or a directory and all its contents. -- @param name string: Pathname of source -- @return boolean: true on success, false on failure. function delete(name) name = dir.normalize(name) return recursive_delete(name) or false end --- List the contents of a directory. -- @param at string or nil: directory to list (will be the current -- directory if none is given). -- @return table: an array of strings with the filenames representing -- the contents of a directory. function list_dir(at) assert(type(at) == "string" or not at) if not at then at = fs.current_dir() end at = dir.normalize(at) if not fs.is_dir(at) then return {} end local result = {} for file in lfs.dir(at) do if file ~= "." and file ~= ".." then table.insert(result, file) end end return result end --- Implementation function for recursive find. -- Assumes paths are normalized. -- @param cwd string: Current working directory in recursion. -- @param prefix string: Auxiliary prefix string to form pathname. -- @param result table: Array of strings where results are collected. local function recursive_find(cwd, prefix, result) for file in lfs.dir(cwd) do if file ~= "." and file ~= ".." then local item = prefix .. file table.insert(result, item) local pathname = dir.path(cwd, file) if lfs.attributes(pathname, "mode") == "directory" then recursive_find(pathname, item..dir_separator, result) end end end end --- Recursively scan the contents of a directory. -- @param at string or nil: directory to scan (will be the current -- directory if none is given). -- @return table: an array of strings with the filenames representing -- the contents of a directory. function find(at) assert(type(at) == "string" or not at) if not at then at = fs.current_dir() end at = dir.normalize(at) if not fs.is_dir(at) then return {} end local result = {} recursive_find(at, "", result) return result end --- Test for existance of a file. -- @param file string: filename to test -- @return boolean: true if file exists, false otherwise. function exists(file) assert(file) file = dir.normalize(file) return type(lfs.attributes(file)) == "table" end --- Test is pathname is a directory. -- @param file string: pathname to test -- @return boolean: true if it is a directory, false otherwise. function is_dir(file) assert(file) file = dir.normalize(file) return lfs.attributes(file, "mode") == "directory" end --- Test is pathname is a regular file. -- @param file string: pathname to test -- @return boolean: true if it is a file, false otherwise. function is_file(file) assert(file) file = dir.normalize(file) return lfs.attributes(file, "mode") == "file" end function set_time(file, time) file = dir.normalize(file) return lfs.touch(file, time) end end --------------------------------------------------------------------- -- LuaZip functions --------------------------------------------------------------------- if zip_ok then function zip(zipfile, ...) return lrzip.zip(zipfile, ...) end end if unzip_ok then --- Uncompress files from a .zip archive. -- @param zipfile string: pathname of .zip archive to be extracted. -- @return boolean: true on success, false on failure. function unzip(zipfile) local zipfile, err = luazip.open(zipfile) if not zipfile then return nil, err end local files = zipfile:files() local file = files() repeat if file.filename:sub(#file.filename) == "/" then fs.make_dir(dir.path(fs.current_dir(), file.filename)) else local rf, err = zipfile:open(file.filename) if not rf then zipfile:close(); return nil, err end local contents = rf:read("*a") rf:close() local wf, err = io.open(dir.path(fs.current_dir(), file.filename), "wb") if not wf then zipfile:close(); return nil, err end wf:write(contents) wf:close() end file = files() until not file zipfile:close() return true end end --------------------------------------------------------------------- -- LuaSocket functions --------------------------------------------------------------------- if socket_ok then local ltn12 = require("ltn12") local luasec_ok, https = pcall(require, "ssl.https") local redirect_protocols = { http = http, https = luasec_ok and https, } local function http_request(url, http, loop_control) local result = {} local proxy = cfg.proxy if type(proxy) ~= "string" then proxy = nil end -- LuaSocket's http.request crashes when given URLs missing the scheme part. if proxy and not proxy:find("://") then proxy = "http://" .. proxy end local res, status, headers, err = http.request { url = url, proxy = proxy, redirect = false, sink = ltn12.sink.table(result), headers = { ["user-agent"] = cfg.user_agent.." via LuaSocket" }, } if not res then return nil, status elseif status == 301 or status == 302 then local location = headers.location if location then local protocol, rest = dir.split_url(location) if redirect_protocols[protocol] then if not loop_control then loop_control = {} elseif loop_control[location] then return nil, "Redirection loop -- broken URL?" end loop_control[url] = true return http_request(location, redirect_protocols[protocol], loop_control) else return nil, "URL redirected to unsupported protocol - install luasec to get HTTPS support." end end return nil, err elseif status ~= 200 then return nil, err else return table.concat(result) end end --- Download a remote file. -- @param url string: URL to be fetched. -- @param filename string or nil: this function attempts to detect the -- resulting local filename of the remote file as the basename of the URL; -- if that is not correct (due to a redirection, for example), the local -- filename can be given explicitly as this second argument. -- @return boolean: true on success, false on failure. function download(url, filename) assert(type(url) == "string") assert(type(filename) == "string" or not filename) filename = dir.path(fs.current_dir(), filename or dir.base_name(url)) local content, err if util.starts_with(url, "http:") then content, err = http_request(url, http) elseif util.starts_with(url, "ftp:") then content, err = ftp.get(url) elseif util.starts_with(url, "https:") then if luasec_ok then content, err = http_request(url, https) else err = "Unsupported protocol - install luasec to get HTTPS support." end else err = "Unsupported protocol" end if not content then return false, tostring(err) end local file = io.open(filename, "wb") if not file then return false end file:write(content) file:close() return true end end --------------------------------------------------------------------- -- MD5 functions --------------------------------------------------------------------- if md5_ok then --- Get the MD5 checksum for a file. -- @param file string: The file to be computed. -- @return string: The MD5 checksum function get_md5(file) file = fs.absolute_name(file) local file = io.open(file, "rb") if not file then return false end local computed = md5.sumhexa(file:read("*a")) file:close() return computed end end --------------------------------------------------------------------- -- POSIX functions --------------------------------------------------------------------- if posix_ok then local octal_to_rwx = { ["0"] = "---", ["1"] = "--x", ["2"] = "-w-", ["3"] = "-wx", ["4"] = "r--", ["5"] = "r-x", ["6"] = "rw-", ["7"] = "rwx", } function chmod(file, mode) -- LuaPosix (as of 5.1.15) does not support octal notation... if mode:sub(1,1) == "0" then local new_mode = {} for c in mode:sub(2):gmatch(".") do table.insert(new_mode, octal_to_rwx[c]) end mode = table.concat(new_mode) end local err = posix.chmod(file, mode) return err == 0 end function get_permissions(file) return posix.stat(file, "mode") end end --------------------------------------------------------------------- -- Other functions --------------------------------------------------------------------- --- Apply a patch. -- @param patchname string: The filename of the patch. -- @param patchdata string or nil: The actual patch as a string. function apply_patch(patchname, patchdata) local p, all_ok = patch.read_patch(patchname, patchdata) if not all_ok then return nil, "Failed reading patch "..patchname end if p then return patch.apply_patch(p, 1) end end --- Move a file. -- @param src string: Pathname of source -- @param dest string: Pathname of destination -- @return boolean or (boolean, string): true on success, false on failure, -- plus an error message. function move(src, dest) assert(src and dest) if fs.exists(dest) and not fs.is_dir(dest) then return false, "File already exists: "..dest end local ok, err = fs.copy(src, dest) if not ok then return false, err end ok = fs.delete(src) if not ok then return false, "Failed move: could not delete "..src.." after copy." end return true end --- Check if user has write permissions for the command. -- Assumes the configuration variables under cfg have been previously set up. -- @param flags table: the flags table passed to run() drivers. -- @return boolean or (boolean, string): true on success, false on failure, -- plus an error message. function check_command_permissions(flags) local root_dir = path.root_dir(cfg.rocks_dir) local ok = true local err = "" for _, dir in ipairs { cfg.rocks_dir, root_dir } do if fs.exists(dir) and not fs.is_writable(dir) then ok = false err = "Your user does not have write permissions in " .. dir break end end local root_parent = dir.dir_name(root_dir) if ok and not fs.exists(root_dir) and not fs.is_writable(root_parent) then ok = false err = root_dir.." does not exist and your user does not have write permissions in " .. root_parent end if ok then return true else if flags["local"] then err = err .. " \n-- please check your permissions." else err = err .. " \n-- you may want to run as a privileged user or use your local tree with --local." end return nil, err end end