extract _extract_file, add test
This commit is contained in:
parent
e6f5b9cea4
commit
71512809a9
|
@ -284,29 +284,30 @@ class TwistedReceiver:
|
||||||
self._msg(u"Received file written to %s" %
|
self._msg(u"Received file written to %s" %
|
||||||
os.path.basename(self.abs_destname))
|
os.path.basename(self.abs_destname))
|
||||||
|
|
||||||
|
def _extract_file(self, zf, info, extract_dir):
|
||||||
|
"""
|
||||||
|
the zipfile module does not restore file permissions
|
||||||
|
so we'll do it manually
|
||||||
|
"""
|
||||||
|
out_path = os.path.join( extract_dir, info.filename )
|
||||||
|
out_path = os.path.abspath( out_path )
|
||||||
|
if not out_path.startswith( extract_dir ):
|
||||||
|
raise ValueError( "malicious zipfile, %s outside of extract_dir %s"
|
||||||
|
% (info.filename, extract_dir) )
|
||||||
|
|
||||||
|
zf.extract( info.filename, path=extract_dir )
|
||||||
|
|
||||||
|
# not sure why zipfiles store the perms 16 bits away but they do
|
||||||
|
perm = info.external_attr >> 16
|
||||||
|
os.chmod( out_path, perm )
|
||||||
|
|
||||||
def _write_directory(self, f):
|
def _write_directory(self, f):
|
||||||
def extract_file( zf, info, extract_dir ):
|
|
||||||
"""
|
|
||||||
the zipfile module does not restore file permissions
|
|
||||||
so we'll do it manually
|
|
||||||
"""
|
|
||||||
out_path = os.path.join( extract_dir, info.filename )
|
|
||||||
out_path = os.path.abspath( out_path )
|
|
||||||
if not out_path.startswith( extract_dir ):
|
|
||||||
raise ValueError( "malicious zipfile, %s outside of extract_dir %s"
|
|
||||||
% (info.filename, extract_dir) )
|
|
||||||
|
|
||||||
zf.extract( info.filename, path=extract_dir )
|
|
||||||
|
|
||||||
# not sure why zipfiles store the perms 16 bits away but they do
|
|
||||||
perm = info.external_attr >> 16
|
|
||||||
os.chmod( out_path, perm )
|
|
||||||
|
|
||||||
self._msg(u"Unpacking zipfile..")
|
self._msg(u"Unpacking zipfile..")
|
||||||
with self.args.timing.add("unpack zip"):
|
with self.args.timing.add("unpack zip"):
|
||||||
with zipfile.ZipFile(f, "r", zipfile.ZIP_DEFLATED) as zf:
|
with zipfile.ZipFile(f, "r", zipfile.ZIP_DEFLATED) as zf:
|
||||||
for info in zf.infolist():
|
for info in zf.infolist():
|
||||||
extract_file( zf, info, self.abs_destname )
|
self._extract_file( zf, info, self.abs_destname )
|
||||||
|
|
||||||
self._msg(u"Received files written to %s/" %
|
self._msg(u"Received files written to %s/" %
|
||||||
os.path.basename(self.abs_destname))
|
os.path.basename(self.abs_destname))
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
import os, sys, re, io, zipfile, six
|
import os, sys, re, io, zipfile, six
|
||||||
|
import mock
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from twisted.python import procutils, log
|
from twisted.python import procutils, log
|
||||||
from twisted.internet.utils import getProcessOutputAndValue
|
from twisted.internet.utils import getProcessOutputAndValue
|
||||||
|
@ -613,3 +614,44 @@ class Cleanup(ServerBase, unittest.TestCase):
|
||||||
self.assertEqual(len(cids), 0)
|
self.assertEqual(len(cids), 0)
|
||||||
self.flushLoggedErrors(WrongPasswordError)
|
self.flushLoggedErrors(WrongPasswordError)
|
||||||
|
|
||||||
|
class ExtractFile(unittest.TestCase):
|
||||||
|
def test_filenames(self):
|
||||||
|
args = mock.Mock()
|
||||||
|
args.relay_url = u""
|
||||||
|
ef = cmd_receive.TwistedReceiver(args)._extract_file
|
||||||
|
extract_dir = os.path.abspath(self.mktemp())
|
||||||
|
|
||||||
|
zf = mock.Mock()
|
||||||
|
zi = mock.Mock()
|
||||||
|
zi.filename = "ok"
|
||||||
|
zi.external_attr = 5 << 16
|
||||||
|
expected = os.path.join(extract_dir, "ok")
|
||||||
|
with mock.patch.object(cmd_receive.os, "chmod") as chmod:
|
||||||
|
ef(zf, zi, extract_dir)
|
||||||
|
self.assertEqual(zf.extract.mock_calls,
|
||||||
|
[mock.call(zi.filename, path=extract_dir)])
|
||||||
|
self.assertEqual(chmod.mock_calls, [mock.call(expected, 5)])
|
||||||
|
|
||||||
|
zf = mock.Mock()
|
||||||
|
zi = mock.Mock()
|
||||||
|
zi.filename = "../haha"
|
||||||
|
e = self.assertRaises(ValueError, ef, zf, zi, extract_dir)
|
||||||
|
self.assertIn("malicious zipfile", str(e))
|
||||||
|
|
||||||
|
zf = mock.Mock()
|
||||||
|
zi = mock.Mock()
|
||||||
|
zi.filename = "haha//root" # abspath squashes this, hopefully zipfile
|
||||||
|
# does too
|
||||||
|
zi.external_attr = 5 << 16
|
||||||
|
expected = os.path.join(extract_dir, "haha/root")
|
||||||
|
with mock.patch.object(cmd_receive.os, "chmod") as chmod:
|
||||||
|
ef(zf, zi, extract_dir)
|
||||||
|
self.assertEqual(zf.extract.mock_calls,
|
||||||
|
[mock.call(zi.filename, path=extract_dir)])
|
||||||
|
self.assertEqual(chmod.mock_calls, [mock.call(expected, 5)])
|
||||||
|
|
||||||
|
zf = mock.Mock()
|
||||||
|
zi = mock.Mock()
|
||||||
|
zi.filename = "/etc/passwd"
|
||||||
|
e = self.assertRaises(ValueError, ef, zf, zi, extract_dir)
|
||||||
|
self.assertIn("malicious zipfile", str(e))
|
||||||
|
|
Loading…
Reference in New Issue
Block a user