Merge branch 'pr40': preserve file perms in directories
This commit is contained in:
commit
54b85035a2
|
@ -223,7 +223,7 @@ class TwistedReceiver:
|
|||
destname = os.path.basename(destname)
|
||||
if self.args.output_file:
|
||||
destname = self.args.output_file # override
|
||||
abs_destname = os.path.join(self.args.cwd, destname)
|
||||
abs_destname = os.path.abspath( os.path.join(self.args.cwd, destname) )
|
||||
|
||||
# get confirmation from the user before writing to the local directory
|
||||
if os.path.exists(abs_destname):
|
||||
|
@ -284,15 +284,31 @@ class TwistedReceiver:
|
|||
self._msg(u"Received file written to %s" %
|
||||
os.path.basename(self.abs_destname))
|
||||
|
||||
def _extract_file(self, zf, info, extract_dir):
|
||||
"""
|
||||
the zipfile module does not restore file permissions
|
||||
so we'll do it manually
|
||||
"""
|
||||
out_path = os.path.join( extract_dir, info.filename )
|
||||
out_path = os.path.abspath( out_path )
|
||||
if not out_path.startswith( extract_dir ):
|
||||
raise ValueError( "malicious zipfile, %s outside of extract_dir %s"
|
||||
% (info.filename, extract_dir) )
|
||||
|
||||
zf.extract( info.filename, path=extract_dir )
|
||||
|
||||
# not sure why zipfiles store the perms 16 bits away but they do
|
||||
perm = info.external_attr >> 16
|
||||
os.chmod( out_path, perm )
|
||||
|
||||
def _write_directory(self, f):
|
||||
|
||||
self._msg(u"Unpacking zipfile..")
|
||||
with self.args.timing.add("unpack zip"):
|
||||
with zipfile.ZipFile(f, "r", zipfile.ZIP_DEFLATED) as zf:
|
||||
zf.extractall(path=self.abs_destname)
|
||||
# extractall() appears to offer some protection against
|
||||
# malicious pathnames. For example, "/tmp/oops" and
|
||||
# "../tmp/oops" both do the same thing as the (safe)
|
||||
# "tmp/oops".
|
||||
for info in zf.infolist():
|
||||
self._extract_file( zf, info, self.abs_destname )
|
||||
|
||||
self._msg(u"Received files written to %s/" %
|
||||
os.path.basename(self.abs_destname))
|
||||
f.close()
|
||||
|
|
|
@ -197,7 +197,6 @@ class Sender:
|
|||
# We're sending a directory. Create a zipfile in a tempdir and
|
||||
# send that.
|
||||
fd_to_send = tempfile.SpooledTemporaryFile()
|
||||
# TODO: I think ZIP_DEFLATED means compressed.. check it
|
||||
num_files = 0
|
||||
num_bytes = 0
|
||||
tostrip = len(what.split(os.sep))
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from __future__ import print_function
|
||||
import os, sys, re, io, zipfile, six
|
||||
import os, sys, re, io, zipfile, six, stat
|
||||
import mock
|
||||
from twisted.trial import unittest
|
||||
from twisted.python import procutils, log
|
||||
from twisted.internet.utils import getProcessOutputAndValue
|
||||
|
@ -287,9 +288,14 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
|
|||
os.mkdir(os.path.join(send_dir, "middle"))
|
||||
source_dir = os.path.join(send_dir, "middle", send_dirname)
|
||||
os.mkdir(source_dir)
|
||||
modes = {}
|
||||
for i in range(5):
|
||||
with open(os.path.join(source_dir, str(i)), "w") as f:
|
||||
path = os.path.join(source_dir, str(i))
|
||||
with open(path, "w") as f:
|
||||
f.write(message(i))
|
||||
if i == 3:
|
||||
os.chmod(path, 0o755)
|
||||
modes[i] = stat.S_IMODE(os.stat(path).st_mode)
|
||||
send_dirname_arg = os.path.join("middle", send_dirname)
|
||||
if addslash:
|
||||
send_dirname_arg += os.sep
|
||||
|
@ -412,6 +418,8 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
|
|||
fn = os.path.join(receive_dir, receive_dirname, str(i))
|
||||
with open(fn, "r") as f:
|
||||
self.failUnlessEqual(f.read(), message(i))
|
||||
self.failUnlessEqual(modes[i],
|
||||
stat.S_IMODE(os.stat(fn).st_mode))
|
||||
|
||||
def test_text(self):
|
||||
return self._do_test()
|
||||
|
@ -613,3 +621,44 @@ class Cleanup(ServerBase, unittest.TestCase):
|
|||
self.assertEqual(len(cids), 0)
|
||||
self.flushLoggedErrors(WrongPasswordError)
|
||||
|
||||
class ExtractFile(unittest.TestCase):
|
||||
def test_filenames(self):
|
||||
args = mock.Mock()
|
||||
args.relay_url = u""
|
||||
ef = cmd_receive.TwistedReceiver(args)._extract_file
|
||||
extract_dir = os.path.abspath(self.mktemp())
|
||||
|
||||
zf = mock.Mock()
|
||||
zi = mock.Mock()
|
||||
zi.filename = "ok"
|
||||
zi.external_attr = 5 << 16
|
||||
expected = os.path.join(extract_dir, "ok")
|
||||
with mock.patch.object(cmd_receive.os, "chmod") as chmod:
|
||||
ef(zf, zi, extract_dir)
|
||||
self.assertEqual(zf.extract.mock_calls,
|
||||
[mock.call(zi.filename, path=extract_dir)])
|
||||
self.assertEqual(chmod.mock_calls, [mock.call(expected, 5)])
|
||||
|
||||
zf = mock.Mock()
|
||||
zi = mock.Mock()
|
||||
zi.filename = "../haha"
|
||||
e = self.assertRaises(ValueError, ef, zf, zi, extract_dir)
|
||||
self.assertIn("malicious zipfile", str(e))
|
||||
|
||||
zf = mock.Mock()
|
||||
zi = mock.Mock()
|
||||
zi.filename = "haha//root" # abspath squashes this, hopefully zipfile
|
||||
# does too
|
||||
zi.external_attr = 5 << 16
|
||||
expected = os.path.join(extract_dir, "haha/root")
|
||||
with mock.patch.object(cmd_receive.os, "chmod") as chmod:
|
||||
ef(zf, zi, extract_dir)
|
||||
self.assertEqual(zf.extract.mock_calls,
|
||||
[mock.call(zi.filename, path=extract_dir)])
|
||||
self.assertEqual(chmod.mock_calls, [mock.call(expected, 5)])
|
||||
|
||||
zf = mock.Mock()
|
||||
zi = mock.Mock()
|
||||
zi.filename = "/etc/passwd"
|
||||
e = self.assertRaises(ValueError, ef, zf, zi, extract_dir)
|
||||
self.assertIn("malicious zipfile", str(e))
|
||||
|
|
Loading…
Reference in New Issue
Block a user