Lines Matching refs:self
30 def _check_sample(self, msg):
32 self.assertIsInstance(msg, email.message.Message)
33 self.assertIsInstance(msg, mailbox.Message)
35 self.assertIn(value, msg.get_all(key))
36 self.assertTrue(msg.is_multipart())
37 self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
40 self.assertIsInstance(part, email.message.Message)
41 self.assertNotIsInstance(part, mailbox.Message)
42 self.assertEqual(part.get_payload(), payload)
44 def _delete_recursively(self, target):
59 def setUp(self):
60 self._path = os_helper.TESTFN
61 self._delete_recursively(self._path)
62 self._box = self._factory(self._path)
64 def tearDown(self):
65 self._box.close()
66 self._delete_recursively(self._path)
68 def test_add(self):
71 keys.append(self._box.add(self._template % 0))
72 self.assertEqual(len(self._box), 1)
73 keys.append(self._box.add(mailbox.Message(_sample_message)))
74 self.assertEqual(len(self._box), 2)
75 keys.append(self._box.add(email.message_from_string(_sample_message)))
76 self.assertEqual(len(self._box), 3)
77 keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
78 self.assertEqual(len(self._box), 4)
79 keys.append(self._box.add(_sample_message))
80 self.assertEqual(len(self._box), 5)
81 keys.append(self._box.add(_bytes_sample_message))
82 self.assertEqual(len(self._box), 6)
83 with self.assertWarns(DeprecationWarning):
84 keys.append(self._box.add(
86 self.assertEqual(len(self._box), 7)
87 self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
89 self._check_sample(self._box[keys[i]])
98 def test_add_invalid_8bit_bytes_header(self):
99 key = self._box.add(self._nonascii_msg.encode('latin-1'))
100 self.assertEqual(len(self._box), 1)
101 self.assertEqual(self._box.get_bytes(key),
102 self._nonascii_msg.encode('latin-1'))
104 def test_invalid_nonascii_header_as_string(self):
105 subj = self._nonascii_msg.splitlines()[1]
106 key = self._box.add(subj.encode('latin-1'))
107 self.assertEqual(self._box.get_string(key),
111 def test_add_nonascii_string_header_raises(self):
112 with self.assertRaisesRegex(ValueError, "ASCII-only"):
113 self._box.add(self._nonascii_msg)
114 self._box.flush()
115 self.assertEqual(len(self._box), 0)
116 self.assertMailboxEmpty()
118 def test_add_that_raises_leaves_mailbox_empty(self):
121 support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
122 with self.assertRaises(Exception):
123 self._box.add(email.message_from_string("From: Alphöso"))
124 self.assertEqual(len(self._box), 0)
125 self._box.close()
126 self.assertMailboxEmpty()
140 def test_add_8bit_body(self):
141 key = self._box.add(self._non_latin_bin_msg)
142 self.assertEqual(self._box.get_bytes(key),
143 self._non_latin_bin_msg)
144 with self._box.get_file(key) as f:
145 self.assertEqual(f.read(),
146 self._non_latin_bin_msg.replace(b'\n',
148 self.assertEqual(self._box[key].get_payload(),
151 def test_add_binary_file(self):
155 key = self._box.add(f)
156 self.assertEqual(self._box.get_bytes(key).split(b'\n'),
159 def test_add_binary_nonascii_file(self):
161 f.write(self._non_latin_bin_msg)
163 key = self._box.add(f)
164 self.assertEqual(self._box.get_bytes(key).split(b'\n'),
165 self._non_latin_bin_msg.split(b'\n'))
167 def test_add_text_file_warns(self):
171 with self.assertWarns(DeprecationWarning):
172 key = self._box.add(f)
173 self.assertEqual(self._box.get_bytes(key).split(b'\n'),
176 def test_add_StringIO_warns(self):
177 with self.assertWarns(DeprecationWarning):
178 key = self._box.add(io.StringIO(self._template % "0"))
179 self.assertEqual(self._box.get_string(key), self._template % "0")
181 def test_add_nonascii_StringIO_raises(self):
182 with self.assertWarns(DeprecationWarning):
183 with self.assertRaisesRegex(ValueError, "ASCII-only"):
184 self._box.add(io.StringIO(self._nonascii_msg))
185 self.assertEqual(len(self._box), 0)
186 self._box.close()
187 self.assertMailboxEmpty()
189 def test_remove(self):
191 self._test_remove_or_delitem(self._box.remove)
193 def test_delitem(self):
195 self._test_remove_or_delitem(self._box.__delitem__)
197 def _test_remove_or_delitem(self, method):
199 key0 = self._box.add(self._template % 0)
200 key1 = self._box.add(self._template % 1)
201 self.assertEqual(len(self._box), 2)
203 self.assertEqual(len(self._box), 1)
204 self.assertRaises(KeyError, lambda: self._box[key0])
205 self.assertRaises(KeyError, lambda: method(key0))
206 self.assertEqual(self._box.get_string(key1), self._template % 1)
207 key2 = self._box.add(self._template % 2)
208 self.assertEqual(len(self._box), 2)
210 self.assertEqual(len(self._box), 1)
211 self.assertRaises(KeyError, lambda: self._box[key2])
212 self.assertRaises(KeyError, lambda: method(key2))
213 self.assertEqual(self._box.get_string(key1), self._template % 1)
215 self.assertEqual(len(self._box), 0)
216 self.assertRaises(KeyError, lambda: self._box[key1])
217 self.assertRaises(KeyError, lambda: method(key1))
219 def test_discard(self, repetitions=10):
221 key0 = self._box.add(self._template % 0)
222 key1 = self._box.add(self._template % 1)
223 self.assertEqual(len(self._box), 2)
224 self._box.discard(key0)
225 self.assertEqual(len(self._box), 1)
226 self.assertRaises(KeyError, lambda: self._box[key0])
227 self._box.discard(key0)
228 self.assertEqual(len(self._box), 1)
229 self.assertRaises(KeyError, lambda: self._box[key0])
231 def test_get(self):
233 key0 = self._box.add(self._template % 0)
234 msg = self._box.get(key0)
235 self.assertEqual(msg['from'], 'foo')
236 self.assertEqual(msg.get_payload(), '0\n')
237 self.assertIsNone(self._box.get('foo'))
238 self.assertIs(self._box.get('foo', False), False)
239 self._box.close()
240 self._box = self._factory(self._path)
241 key1 = self._box.add(self._template % 1)
242 msg = self._box.get(key1)
243 self.assertEqual(msg['from'], 'foo')
244 self.assertEqual(msg.get_payload(), '1\n')
246 def test_getitem(self):
248 key0 = self._box.add(self._template % 0)
249 msg = self._box[key0]
250 self.assertEqual(msg['from'], 'foo')
251 self.assertEqual(msg.get_payload(), '0\n')
252 self.assertRaises(KeyError, lambda: self._box['foo'])
253 self._box.discard(key0)
254 self.assertRaises(KeyError, lambda: self._box[key0])
256 def test_get_message(self):
258 key0 = self._box.add(self._template % 0)
259 key1 = self._box.add(_sample_message)
260 msg0 = self._box.get_message(key0)
261 self.assertIsInstance(msg0, mailbox.Message)
262 self.assertEqual(msg0['from'], 'foo')
263 self.assertEqual(msg0.get_payload(), '0\n')
264 self._check_sample(self._box.get_message(key1))
266 def test_get_bytes(self):
268 key0 = self._box.add(self._template % 0)
269 key1 = self._box.add(_sample_message)
270 self.assertEqual(self._box.get_bytes(key0),
271 (self._template % 0).encode('ascii'))
272 self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
274 def test_get_string(self):
276 key0 = self._box.add(self._template % 0)
277 key1 = self._box.add(_sample_message)
278 self.assertEqual(self._box.get_string(key0), self._template % 0)
279 self.assertEqual(self._box.get_string(key1).split('\n'),
282 def test_get_file(self):
284 key0 = self._box.add(self._template % 0)
285 key1 = self._box.add(_sample_message)
286 with self._box.get_file(key0) as file:
288 with self._box.get_file(key1) as file:
290 self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
291 self._template % 0)
292 self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
295 def test_get_file_can_be_closed_twice(self):
297 key = self._box.add(_sample_message)
298 f = self._box.get_file(key)
302 def test_iterkeys(self):
304 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
306 def test_keys(self):
308 self._check_iteration(self._box.keys, do_keys=True, do_values=False)
310 def test_itervalues(self):
312 self._check_iteration(self._box.itervalues, do_keys=False,
315 def test_iter(self):
317 self._check_iteration(self._box.__iter__, do_keys=False,
320 def test_values(self):
322 self._check_iteration(self._box.values, do_keys=False, do_values=True)
324 def test_iteritems(self):
326 self._check_iteration(self._box.iteritems, do_keys=True,
329 def test_items(self):
331 self._check_iteration(self._box.items, do_keys=True, do_values=True)
333 def _check_iteration(self, method, do_keys, do_values, repetitions=10):
335 self.fail("Not empty")
338 keys.append(self._box.add(self._template % i))
339 values.append(self._template % i)
350 self.assertEqual(len(keys), len(returned_keys))
351 self.assertEqual(set(keys), set(returned_keys))
355 self.assertEqual(value['from'], 'foo')
356 self.assertLess(int(value.get_payload()), repetitions)
358 self.assertEqual(len(values), count)
360 def test_contains(self):
362 self.assertNotIn('foo', self._box)
363 key0 = self._box.add(self._template % 0)
364 self.assertIn(key0, self._box)
365 self.assertNotIn('foo', self._box)
366 key1 = self._box.add(self._template % 1)
367 self.assertIn(key1, self._box)
368 self.assertIn(key0, self._box)
369 self.assertNotIn('foo', self._box)
370 self._box.remove(key0)
371 self.assertNotIn(key0, self._box)
372 self.assertIn(key1, self._box)
373 self.assertNotIn('foo', self._box)
374 self._box.remove(key1)
375 self.assertNotIn(key1, self._box)
376 self.assertNotIn(key0, self._box)
377 self.assertNotIn('foo', self._box)
379 def test_len(self, repetitions=10):
383 self.assertEqual(len(self._box), i)
384 keys.append(self._box.add(self._template % i))
385 self.assertEqual(len(self._box), i + 1)
387 self.assertEqual(len(self._box), repetitions - i)
388 self._box.remove(keys[i])
389 self.assertEqual(len(self._box), repetitions - i - 1)
391 def test_set_item(self):
393 key0 = self._box.add(self._template % 'original 0')
394 self.assertEqual(self._box.get_string(key0),
395 self._template % 'original 0')
396 key1 = self._box.add(self._template % 'original 1')
397 self.assertEqual(self._box.get_string(key1),
398 self._template % 'original 1')
399 self._box[key0] = self._template % 'changed 0'
400 self.assertEqual(self._box.get_string(key0),
401 self._template % 'changed 0')
402 self._box[key1] = self._template % 'changed 1'
403 self.assertEqual(self._box.get_string(key1),
404 self._template % 'changed 1')
405 self._box[key0] = _sample_message
406 self._check_sample(self._box[key0])
407 self._box[key1] = self._box[key0]
408 self._check_sample(self._box[key1])
409 self._box[key0] = self._template % 'original 0'
410 self.assertEqual(self._box.get_string(key0),
411 self._template % 'original 0')
412 self._check_sample(self._box[key1])
413 self.assertRaises(KeyError,
414 lambda: self._box.__setitem__('foo', 'bar'))
415 self.assertRaises(KeyError, lambda: self._box['foo'])
416 self.assertEqual(len(self._box), 2)
418 def test_clear(self, iterations=10):
422 self._box.add(self._template % i)
424 self.assertEqual(self._box.get_string(key), self._template % i)
425 self._box.clear()
426 self.assertEqual(len(self._box), 0)
428 self.assertRaises(KeyError, lambda: self._box.get_string(key))
430 def test_pop(self):
432 key0 = self._box.add(self._template % 0)
433 self.assertIn(key0, self._box)
434 key1 = self._box.add(self._template % 1)
435 self.assertIn(key1, self._box)
436 self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
437 self.assertNotIn(key0, self._box)
438 self.assertIn(key1, self._box)
439 key2 = self._box.add(self._template % 2)
440 self.assertIn(key2, self._box)
441 self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
442 self.assertNotIn(key2, self._box)
443 self.assertIn(key1, self._box)
444 self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
445 self.assertNotIn(key1, self._box)
446 self.assertEqual(len(self._box), 0)
448 def test_popitem(self, iterations=10):
452 keys.append(self._box.add(self._template % i))
455 key, msg = self._box.popitem()
456 self.assertIn(key, keys)
457 self.assertNotIn(key, seen)
459 self.assertEqual(int(msg.get_payload()), keys.index(key))
460 self.assertEqual(len(self._box), 0)
462 self.assertRaises(KeyError, lambda: self._box[key])
464 def test_update(self):
466 key0 = self._box.add(self._template % 'original 0')
467 key1 = self._box.add(self._template % 'original 1')
468 key2 = self._box.add(self._template % 'original 2')
469 self._box.update({key0: self._template % 'changed 0',
471 self.assertEqual(len(self._box), 3)
472 self.assertEqual(self._box.get_string(key0),
473 self._template % 'changed 0')
474 self.assertEqual(self._box.get_string(key1),
475 self._template % 'original 1')
476 self._check_sample(self._box[key2])
477 self._box.update([(key2, self._template % 'changed 2'),
478 (key1, self._template % 'changed 1'),
479 (key0, self._template % 'original 0')])
480 self.assertEqual(len(self._box), 3)
481 self.assertEqual(self._box.get_string(key0),
482 self._template % 'original 0')
483 self.assertEqual(self._box.get_string(key1),
484 self._template % 'changed 1')
485 self.assertEqual(self._box.get_string(key2),
486 self._template % 'changed 2')
487 self.assertRaises(KeyError,
488 lambda: self._box.update({'foo': 'bar',
489 key0: self._template % "changed 0"}))
490 self.assertEqual(len(self._box), 3)
491 self.assertEqual(self._box.get_string(key0),
492 self._template % "changed 0")
493 self.assertEqual(self._box.get_string(key1),
494 self._template % "changed 1")
495 self.assertEqual(self._box.get_string(key2),
496 self._template % "changed 2")
498 def test_flush(self):
500 self._test_flush_or_close(self._box.flush, True)
502 def test_popitem_and_flush_twice(self):
504 self._box.add(self._template % 0)
505 self._box.add(self._template % 1)
506 self._box.flush()
508 self._box.popitem()
509 self._box.flush()
510 self._box.popitem()
511 self._box.flush()
513 def test_lock_unlock(self):
515 self.assertFalse(os.path.exists(self._get_lock_path()))
516 self._box.lock()
517 self.assertTrue(os.path.exists(self._get_lock_path()))
518 self._box.unlock()
519 self.assertFalse(os.path.exists(self._get_lock_path()))
521 def test_close(self):
523 self._test_flush_or_close(self._box.close, False)
525 def _test_flush_or_close(self, method, should_call_close):
526 contents = [self._template % i for i in range(3)]
527 self._box.add(contents[0])
528 self._box.add(contents[1])
529 self._box.add(contents[2])
530 oldbox = self._box
533 self._box.close()
534 self._box = self._factory(self._path)
535 keys = self._box.keys()
536 self.assertEqual(len(keys), 3)
538 self.assertIn(self._box.get_string(key), contents)
541 def test_dump_message(self):
546 self._box._dump_message(input, output)
547 self.assertEqual(output.getvalue(),
550 self.assertRaises(TypeError,
551 lambda: self._box._dump_message(None, output))
553 def _get_lock_path(self):
555 return self._path + '.lock'
560 def test_notimplemented(self):
563 self.assertRaises(NotImplementedError, lambda: box.add(''))
564 self.assertRaises(NotImplementedError, lambda: box.remove(''))
565 self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
566 self.assertRaises(NotImplementedError, lambda: box.discard(''))
567 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
568 self.assertRaises(NotImplementedError, lambda: box.iterkeys())
569 self.assertRaises(NotImplementedError, lambda: box.keys())
570 self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
571 self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
572 self.assertRaises(NotImplementedError, lambda: box.values())
573 self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
574 self.assertRaises(NotImplementedError, lambda: box.items())
575 self.assertRaises(NotImplementedError, lambda: box.get(''))
576 self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
577 self.assertRaises(NotImplementedError, lambda: box.get_message(''))
578 self.assertRaises(NotImplementedError, lambda: box.get_string(''))
579 self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
580 self.assertRaises(NotImplementedError, lambda: box.get_file(''))
581 self.assertRaises(NotImplementedError, lambda: '' in box)
582 self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
583 self.assertRaises(NotImplementedError, lambda: box.__len__())
584 self.assertRaises(NotImplementedError, lambda: box.clear())
585 self.assertRaises(NotImplementedError, lambda: box.pop(''))
586 self.assertRaises(NotImplementedError, lambda: box.popitem())
587 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
588 self.assertRaises(NotImplementedError, lambda: box.flush())
589 self.assertRaises(NotImplementedError, lambda: box.lock())
590 self.assertRaises(NotImplementedError, lambda: box.unlock())
591 self.assertRaises(NotImplementedError, lambda: box.close())
596 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
598 def setUp(self):
599 TestMailbox.setUp(self)
601 self._box.colon = '!'
603 def assertMailboxEmpty(self):
604 self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
606 def test_add_MM(self):
608 msg = mailbox.MaildirMessage(self._template % 0)
611 key = self._box.add(msg)
612 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
613 (key, self._box.colon))))
615 def test_get_MM(self):
617 msg = mailbox.MaildirMessage(self._template % 0)
620 key = self._box.add(msg)
621 msg_returned = self._box.get_message(key)
622 self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
623 self.assertEqual(msg_returned.get_subdir(), 'cur')
624 self.assertEqual(msg_returned.get_flags(), 'FR')
626 def test_set_MM(self):
628 msg0 = mailbox.MaildirMessage(self._template % 0)
630 key = self._box.add(msg0)
631 msg_returned = self._box.get_message(key)
632 self.assertEqual(msg_returned.get_subdir(), 'new')
633 self.assertEqual(msg_returned.get_flags(), 'PT')
634 msg1 = mailbox.MaildirMessage(self._template % 1)
635 self._box[key] = msg1
636 msg_returned = self._box.get_message(key)
637 self.assertEqual(msg_returned.get_subdir(), 'new')
638 self.assertEqual(msg_returned.get_flags(), '')
639 self.assertEqual(msg_returned.get_payload(), '1\n')
640 msg2 = mailbox.MaildirMessage(self._template % 2)
642 self._box[key] = msg2
643 self._box[key] = self._template % 3
644 msg_returned = self._box.get_message(key)
645 self.assertEqual(msg_returned.get_subdir(), 'new')
646 self.assertEqual(msg_returned.get_flags(), 'S')
647 self.assertEqual(msg_returned.get_payload(), '3\n')
649 def test_consistent_factory(self):
651 msg = mailbox.MaildirMessage(self._template % 0)
654 key = self._box.add(msg)
659 box = mailbox.Maildir(self._path, factory=FakeMessage)
660 box.colon = self._box.colon
662 self.assertIsInstance(msg2, FakeMessage)
664 def test_initialize_new(self):
666 self.tearDown()
667 self._box = mailbox.Maildir(self._path)
668 self._check_basics()
669 self._delete_recursively(self._path)
670 self._box = self._factory(self._path, factory=None)
671 self._check_basics()
673 def test_initialize_existing(self):
675 self.tearDown()
677 os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
678 self._box = mailbox.Maildir(self._path)
679 self._check_basics()
681 def _check_basics(self, factory=None):
683 self.assertEqual(self._box._path, os.path.abspath(self._path))
684 self.assertEqual(self._box._factory, factory)
686 path = os.path.join(self._path, subdir)
688 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
690 def test_list_folders(self):
692 self._box.add_folder('one')
693 self._box.add_folder('two')
694 self._box.add_folder('three')
695 self.assertEqual(len(self._box.list_folders()), 3)
696 self.assertEqual(set(self._box.list_folders()),
699 def test_get_folder(self):
701 self._box.add_folder('foo.bar')
702 folder0 = self._box.get_folder('foo.bar')
703 folder0.add(self._template % 'bar')
704 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
705 folder1 = self._box.get_folder('foo.bar')
706 self.assertEqual(folder1.get_string(folder1.keys()[0]),
707 self._template % 'bar')
709 def test_add_and_remove_folders(self):
711 self._box.add_folder('one')
712 self._box.add_folder('two')
713 self.assertEqual(len(self._box.list_folders()), 2)
714 self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
715 self._box.remove_folder('one')
716 self.assertEqual(len(self._box.list_folders()), 1)
717 self.assertEqual(set(self._box.list_folders()), set(('two',)))
718 self._box.add_folder('three')
719 self.assertEqual(len(self._box.list_folders()), 2)
720 self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
721 self._box.remove_folder('three')
722 self.assertEqual(len(self._box.list_folders()), 1)
723 self.assertEqual(set(self._box.list_folders()), set(('two',)))
724 self._box.remove_folder('two')
725 self.assertEqual(len(self._box.list_folders()), 0)
726 self.assertEqual(self._box.list_folders(), [])
728 def test_clean(self):
730 foo_path = os.path.join(self._path, 'tmp', 'foo')
731 bar_path = os.path.join(self._path, 'tmp', 'bar')
736 self._box.clean()
737 self.assertTrue(os.path.exists(foo_path))
738 self.assertTrue(os.path.exists(bar_path))
742 self._box.clean()
743 self.assertFalse(os.path.exists(foo_path))
744 self.assertTrue(os.path.exists(bar_path))
746 def test_create_tmp(self, repetitions=10):
758 tmp_file = self._box._create_tmp()
760 self.assertEqual(head, os.path.abspath(os.path.join(self._path,
764 self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
767 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
771 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
774 self.assertEqual(int(groups[2]), pid,
777 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
780 self.assertEqual(groups[4], hostname,
786 self.assertEqual(tmp_file.read(), _bytes_sample_message)
788 file_count = len(os.listdir(os.path.join(self._path, "tmp")))
789 self.assertEqual(file_count, repetitions,
793 def test_refresh(self):
795 self.assertEqual(self._box._toc, {})
796 key0 = self._box.add(self._template % 0)
797 key1 = self._box.add(self._template % 1)
798 self.assertEqual(self._box._toc, {})
799 self._box._refresh()
800 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
802 key2 = self._box.add(self._template % 2)
803 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
805 self._box._refresh()
806 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
810 def test_refresh_after_safety_period(self):
814 key0 = self._box.add(self._template % 0)
815 key1 = self._box.add(self._template % 1)
817 self._box = self._factory(self._path)
818 self.assertEqual(self._box._toc, {})
824 self._box._skewfactor = -3
826 self._box._refresh()
827 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
829 def test_lookup(self):
831 self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
832 key0 = self._box.add(self._template % 0)
833 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
834 os.remove(os.path.join(self._path, 'new', key0))
835 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
838 self._box.flush()
839 self.assertRaises(KeyError, lambda: self._box._lookup(key0))
840 self.assertEqual(self._box._toc, {})
842 def test_lock_unlock(self):
844 self._box.lock()
845 self._box.unlock()
847 def test_folder (self):
852 box = self._factory(self._path, factory=dummy_factory)
854 self.assertIs(folder._factory, dummy_factory)
857 self.assertIs(folder1_alias._factory, dummy_factory)
859 def test_directory_in_folder (self):
863 self._box.add(mailbox.Message(_sample_message))
866 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
869 for msg in self._box:
873 def test_file_permissions(self):
875 msg = mailbox.MaildirMessage(self._template % 0)
878 key = self._box.add(msg)
881 path = os.path.join(self._path, self._box._lookup(key))
883 self.assertFalse(mode & 0o111)
886 def test_folder_file_perms(self):
891 subfolder = self._box.add_folder('subfolder')
898 self.assertFalse((perms & 0o111)) # Execute bits should all be off.
900 def test_reread(self):
902 self._box._refresh()
907 os.utime(os.path.join(self._box._path, subdir),
919 self._box._skewfactor = -3
924 orig_toc = self._box._toc
926 return self._box._toc is not orig_toc
928 self._box._refresh()
929 self.assertFalse(refreshed())
934 filename = os.path.join(self._path, 'cur', 'stray-file')
937 self._box._refresh()
938 self.assertTrue(refreshed())
944 def test_add_doesnt_rewrite(self):
951 inode_before = os.stat(self._path).st_ino
953 self._box.add(self._template % 0)
954 self._box.flush()
956 inode_after = os.stat(self._path).st_ino
957 self.assertEqual(inode_before, inode_after)
960 self._box.close()
961 self._box = self._factory(self._path)
962 self.assertEqual(len(self._box), 1)
964 def test_permissions_after_flush(self):
970 mode = os.stat(self._path).st_mode | 0o666
971 os.chmod(self._path, mode)
973 self._box.add(self._template % 0)
974 i = self._box.add(self._template % 1)
976 self._box.remove(i)
977 self._box.flush()
979 self.assertEqual(os.stat(self._path).st_mode, mode)
984 def tearDown(self):
986 self._box.close()
987 self._delete_recursively(self._path)
988 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
991 def assertMailboxEmpty(self):
992 with open(self._path, 'rb') as f:
993 self.assertEqual(f.readlines(), [])
995 def test_get_bytes_from(self):
998 key0 = self._box.add(unixfrom + self._template % 0)
999 key1 = self._box.add(unixfrom + _sample_message)
1000 self.assertEqual(self._box.get_bytes(key0, from_=False),
1001 (self._template % 0).encode('ascii'))
1002 self.assertEqual(self._box.get_bytes(key1, from_=False),
1004 self.assertEqual(self._box.get_bytes(key0, from_=True),
1005 (unixfrom + self._template % 0).encode('ascii'))
1006 self.assertEqual(self._box.get_bytes(key1, from_=True),
1009 def test_get_string_from(self):
1012 key0 = self._box.add(unixfrom + self._template % 0)
1013 key1 = self._box.add(unixfrom + _sample_message)
1014 self.assertEqual(self._box.get_string(key0, from_=False),
1015 self._template % 0)
1016 self.assertEqual(self._box.get_string(key1, from_=False).split('\n'),
1018 self.assertEqual(self._box.get_string(key0, from_=True),
1019 unixfrom + self._template % 0)
1020 self.assertEqual(self._box.get_string(key1, from_=True).split('\n'),
1023 def test_add_from_string(self):
1025 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
1026 self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1027 self.assertEqual(self._box[key].get_payload(), '0\n')
1029 def test_add_from_bytes(self):
1031 key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
1032 self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1033 self.assertEqual(self._box[key].get_payload(), '0\n')
1035 def test_add_mbox_or_mmdf_message(self):
1039 key = self._box.add(msg)
1041 def test_open_close_open(self):
1043 values = [self._template % i for i in range(3)]
1045 self._box.add(value)
1046 self._box.close()
1047 mtime = os.path.getmtime(self._path)
1048 self._box = self._factory(self._path)
1049 self.assertEqual(len(self._box), 3)
1050 for key in self._box.iterkeys():
1051 self.assertIn(self._box.get_string(key), values)
1052 self._box.close()
1053 self.assertEqual(mtime, os.path.getmtime(self._path))
1055 def test_add_and_close(self):
1057 self._box.add(_sample_message)
1059 self._box.add(self._template % i)
1060 self._box.add(_sample_message)
1061 self._box._file.flush()
1062 self._box._file.seek(0)
1063 contents = self._box._file.read()
1064 self._box.close()
1065 with open(self._path, 'rb') as f:
1066 self.assertEqual(contents, f.read())
1067 self._box = self._factory(self._path)
1071 def test_lock_conflict(self):
1075 self.addCleanup(c.close)
1076 self.addCleanup(p.close)
1083 self._box.lock()
1088 self._box.unlock()
1095 self.assertRaises(mailbox.ExternalClashError,
1096 self._box.lock)
1103 self._box.lock()
1104 self._box.unlock()
1106 def test_relock(self):
1110 key1 = self._box.add(msg)
1111 self._box.flush()
1112 self._box.close()
1114 self._box = self._factory(self._path)
1115 self._box.lock()
1116 key2 = self._box.add(msg)
1117 self._box.flush()
1118 self.assertTrue(self._box._locked)
1119 self._box.close()
1124 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
1127 def test_file_perms(self):
1133 self._box.close()
1134 os.unlink(self._path)
1135 self._box = mailbox.mbox(self._path, create=True)
1136 self._box.add('')
1137 self._box.close()
1141 st = os.stat(self._path)
1143 self.assertFalse((perms & 0o111)) # Execute bits should all be off.
1145 def test_terminating_newline(self):
1149 i = self._box.add(message)
1152 message = self._box.get(i)
1153 self.assertEqual(message.get_payload(), 'No newline at the end\n')
1155 def test_message_separator(self):
1157 self._box.add('From: foo\n\n0') # No newline at the end
1158 with open(self._path, encoding='utf-8') as f:
1160 self.assertEqual(data[-3:], '0\n\n')
1162 self._box.add('From: foo\n\n0\n') # Newline at the end
1163 with open(self._path, encoding='utf-8') as f:
1165 self.assertEqual(data[-3:], '0\n\n')
1170 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
1175 _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
1177 def assertMailboxEmpty(self):
1178 self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
1180 def test_list_folders(self):
1182 self._box.add_folder('one')
1183 self._box.add_folder('two')
1184 self._box.add_folder('three')
1185 self.assertEqual(len(self._box.list_folders()), 3)
1186 self.assertEqual(set(self._box.list_folders()),
1189 def test_get_folder(self):
1193 self._box = self._factory(self._path, dummy_factory)
1195 new_folder = self._box.add_folder('foo.bar')
1196 folder0 = self._box.get_folder('foo.bar')
1197 folder0.add(self._template % 'bar')
1198 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
1199 folder1 = self._box.get_folder('foo.bar')
1200 self.assertEqual(folder1.get_string(folder1.keys()[0]),
1201 self._template % 'bar')
1205 self.assertIs(new_folder._factory, self._box._factory)
1206 self.assertIs(folder0._factory, self._box._factory)
1208 def test_add_and_remove_folders(self):
1210 self._box.add_folder('one')
1211 self._box.add_folder('two')
1212 self.assertEqual(len(self._box.list_folders()), 2)
1213 self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
1214 self._box.remove_folder('one')
1215 self.assertEqual(len(self._box.list_folders()), 1)
1216 self.assertEqual(set(self._box.list_folders()), set(('two',)))
1217 self._box.add_folder('three')
1218 self.assertEqual(len(self._box.list_folders()), 2)
1219 self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
1220 self._box.remove_folder('three')
1221 self.assertEqual(len(self._box.list_folders()), 1)
1222 self.assertEqual(set(self._box.list_folders()), set(('two',)))
1223 self._box.remove_folder('two')
1224 self.assertEqual(len(self._box.list_folders()), 0)
1225 self.assertEqual(self._box.list_folders(), [])
1227 def test_sequences(self):
1229 self.assertEqual(self._box.get_sequences(), {})
1230 msg0 = mailbox.MHMessage(self._template % 0)
1232 key0 = self._box.add(msg0)
1233 self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
1234 msg1 = mailbox.MHMessage(self._template % 1)
1236 key1 = self._box.add(msg1)
1237 self.assertEqual(self._box.get_sequences(),
1240 self._box[key0] = msg0
1241 self.assertEqual(self._box.get_sequences(),
1244 self._box.remove(key1)
1245 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
1247 def test_issue2625(self):
1248 msg0 = mailbox.MHMessage(self._template % 0)
1250 key0 = self._box.add(msg0)
1251 refmsg0 = self._box.get_message(key0)
1253 def test_issue7627(self):
1254 msg0 = mailbox.MHMessage(self._template % 0)
1255 key0 = self._box.add(msg0)
1256 self._box.lock()
1257 self._box.remove(key0)
1258 self._box.unlock()
1260 def test_pack(self):
1262 msg0 = mailbox.MHMessage(self._template % 0)
1263 msg1 = mailbox.MHMessage(self._template % 1)
1264 msg2 = mailbox.MHMessage(self._template % 2)
1265 msg3 = mailbox.MHMessage(self._template % 3)
1270 key0 = self._box.add(msg0)
1271 key1 = self._box.add(msg1)
1272 key2 = self._box.add(msg2)
1273 key3 = self._box.add(msg3)
1274 self.assertEqual(self._box.get_sequences(),
1277 self._box.remove(key2)
1278 self.assertEqual(self._box.get_sequences(),
1281 self._box.pack()
1282 self.assertEqual(self._box.keys(), [1, 2, 3])
1286 self.assertEqual(self._box.get_sequences(),
1290 key0 = self._box.add(msg1)
1291 key1 = self._box.add(msg1)
1292 key2 = self._box.add(msg1)
1293 key3 = self._box.add(msg1)
1295 self._box.remove(key0)
1296 self._box.remove(key2)
1297 self._box.lock()
1298 self._box.pack()
1299 self._box.unlock()
1300 self.assertEqual(self._box.get_sequences(),
1304 def _get_lock_path(self):
1305 return os.path.join(self._path, '.mh_sequences.lock')
1310 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
1312 def assertMailboxEmpty(self):
1313 with open(self._path, 'rb') as f:
1314 self.assertEqual(f.readlines(), [])
1316 def tearDown(self):
1318 self._box.close()
1319 self._delete_recursively(self._path)
1320 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
1323 def test_labels(self):
1325 self.assertEqual(self._box.get_labels(), [])
1326 msg0 = mailbox.BabylMessage(self._template % 0)
1328 key0 = self._box.add(msg0)
1329 self.assertEqual(self._box.get_labels(), ['foo'])
1330 msg1 = mailbox.BabylMessage(self._template % 1)
1332 key1 = self._box.add(msg1)
1333 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
1335 self._box[key0] = msg0
1336 self.assertEqual(set(self._box.get_labels()),
1338 self._box.remove(key1)
1339 self.assertEqual(set(self._box.get_labels()), set(['blah']))
1344 def __init__(self):
1345 self.closed = False
1347 def close(self):
1348 self.closed = True
1353 def __init__(self):
1354 mailbox.Mailbox.__init__(self, '', lambda file: None)
1355 self.files = [FakeFileLikeObject() for i in range(10)]
1357 def get_file(self, key):
1358 return self.files[key]
1363 def test_closing_fd(self):
1366 self.assertFalse(box.files[i].closed)
1370 self.assertTrue(box.files[i].closed)
1377 def setUp(self):
1378 self._path = os_helper.TESTFN
1380 def tearDown(self):
1381 self._delete_recursively(self._path)
1383 def test_initialize_with_eMM(self):
1386 msg = self._factory(eMM)
1387 self._post_initialize_hook(msg)
1388 self._check_sample(msg)
1390 def test_initialize_with_string(self):
1392 msg = self._factory(_sample_message)
1393 self._post_initialize_hook(msg)
1394 self._check_sample(msg)
1396 def test_initialize_with_file(self):
1398 with open(self._path, 'w+', encoding='utf-8') as f:
1401 msg = self._factory(f)
1402 self._post_initialize_hook(msg)
1403 self._check_sample(msg)
1405 def test_initialize_with_binary_file(self):
1407 with open(self._path, 'wb+') as f:
1410 msg = self._factory(f)
1411 self._post_initialize_hook(msg)
1412 self._check_sample(msg)
1414 def test_initialize_with_nothing(self):
1416 msg = self._factory()
1417 self._post_initialize_hook(msg)
1418 self.assertIsInstance(msg, email.message.Message)
1419 self.assertIsInstance(msg, mailbox.Message)
1420 self.assertIsInstance(msg, self._factory)
1421 self.assertEqual(msg.keys(), [])
1422 self.assertFalse(msg.is_multipart())
1423 self.assertIsNone(msg.get_payload())
1425 def test_initialize_incorrectly(self):
1427 self.assertRaises(TypeError, lambda: self._factory(object()))
1429 def test_all_eMM_attributes_exist(self):
1432 msg = self._factory(_sample_message)
1434 self.assertIn(attr, msg.__dict__,
1437 def test_become_message(self):
1440 msg = self._factory()
1442 self._check_sample(msg)
1444 def test_explain_to(self):
1445 # Copy self's format-specific data to other message formats.
1447 msg = self._factory()
1448 for class_ in self.all_mailbox_types:
1452 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
1454 def _post_initialize_hook(self, msg):
1463 def _post_initialize_hook(self, msg):
1464 self.assertEqual(msg._subdir, 'new')
1465 self.assertEqual(msg._info, '')
1467 def test_subdir(self):
1470 self.assertEqual(msg.get_subdir(), 'new')
1472 self.assertEqual(msg.get_subdir(), 'cur')
1474 self.assertEqual(msg.get_subdir(), 'new')
1475 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
1476 self.assertEqual(msg.get_subdir(), 'new')
1478 self.assertEqual(msg.get_subdir(), 'new')
1479 self._check_sample(msg)
1481 def test_flags(self):
1484 self.assertEqual(msg.get_flags(), '')
1485 self.assertEqual(msg.get_subdir(), 'new')
1487 self.assertEqual(msg.get_subdir(), 'new')
1488 self.assertEqual(msg.get_flags(), 'F')
1490 self.assertEqual(msg.get_flags(), 'DPST')
1492 self.assertEqual(msg.get_flags(), 'DFPST')
1494 self.assertEqual(msg.get_flags(), 'FS')
1495 self.assertEqual(msg.get_subdir(), 'new')
1496 self._check_sample(msg)
1498 def test_date(self):
1501 self.assertLess(abs(msg.get_date() - time.time()), 60)
1503 self.assertEqual(msg.get_date(), 0.0)
1505 def test_info(self):
1508 self.assertEqual(msg.get_info(), '')
1510 self.assertEqual(msg.get_info(), '1,foo=bar')
1511 self.assertRaises(TypeError, lambda: msg.set_info(None))
1512 self._check_sample(msg)
1514 def test_info_and_flags(self):
1517 self.assertEqual(msg.get_info(), '')
1519 self.assertEqual(msg.get_flags(), 'FS')
1520 self.assertEqual(msg.get_info(), '2,FS')
1522 self.assertEqual(msg.get_flags(), '')
1523 self.assertEqual(msg.get_info(), '1,')
1525 self.assertEqual(msg.get_flags(), '')
1526 self.assertEqual(msg.get_info(), '1,')
1528 self.assertEqual(msg.get_flags(), 'D')
1529 self.assertEqual(msg.get_info(), '2,D')
1530 self._check_sample(msg)
1537 def _post_initialize_hook(self, msg):
1538 self._check_from(msg)
1540 def test_initialize_with_unixfrom(self):
1545 self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
1547 def test_from(self):
1550 self._check_from(msg)
1552 self.assertEqual(msg.get_from(), 'foo bar')
1554 self._check_from(msg, 'foo@bar')
1556 self._check_from(msg, 'blah@temp')
1558 def test_flags(self):
1561 self.assertEqual(msg.get_flags(), '')
1563 self.assertEqual(msg.get_flags(), 'F')
1565 self.assertEqual(msg.get_flags(), 'RODX')
1567 self.assertEqual(msg.get_flags(), 'RODFAX')
1569 self.assertEqual(msg.get_flags(), 'RO')
1570 self._check_sample(msg)
1572 def _check_from(self, msg, sender=None):
1576 self.assertIsNotNone(re.match(
1590 def _post_initialize_hook(self, msg):
1591 self.assertEqual(msg._sequences, [])
1593 def test_sequences(self):
1596 self.assertEqual(msg.get_sequences(), [])
1598 self.assertEqual(msg.get_sequences(), ['foobar'])
1600 self.assertEqual(msg.get_sequences(), [])
1602 self.assertEqual(msg.get_sequences(), ['unseen'])
1604 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1606 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1608 self.assertEqual(msg.get_sequences(), ['flagged'])
1610 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1612 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1614 self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
1621 def _post_initialize_hook(self, msg):
1622 self.assertEqual(msg._labels, [])
1624 def test_labels(self):
1627 self.assertEqual(msg.get_labels(), [])
1629 self.assertEqual(msg.get_labels(), ['foobar'])
1631 self.assertEqual(msg.get_labels(), [])
1633 self.assertEqual(msg.get_labels(), ['filed'])
1635 self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1637 self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1639 self.assertEqual(msg.get_labels(), ['resent'])
1641 self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1643 self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1645 self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
1647 def test_visible(self):
1651 self.assertEqual(visible.keys(), [])
1652 self.assertIsNone(visible.get_payload())
1655 self.assertEqual(msg.get_visible().keys(), [])
1658 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1659 self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
1660 self.assertEqual(visible['X-Whatever'], 'Blah')
1661 self.assertIsNone(visible.get_payload())
1663 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1664 self.assertIsNone(visible.get_payload())
1666 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
1669 self.assertEqual(visible[header], msg[header])
1679 def test_plain_to_x(self):
1681 for class_ in self.all_mailbox_types:
1684 self._check_sample(msg)
1686 def test_x_to_plain(self):
1688 for class_ in self.all_mailbox_types:
1691 self._check_sample(msg_plain)
1693 def test_x_from_bytes(self):
1695 for class_ in self.all_mailbox_types:
1697 self._check_sample(msg)
1699 def test_x_to_invalid(self):
1701 for class_ in self.all_mailbox_types:
1702 self.assertRaises(TypeError, lambda: class_(False))
1704 def test_type_specific_attributes_removed_on_conversion(self):
1706 for class_ in self.all_mailbox_types}
1707 for class1 in self.all_mailbox_types:
1708 for class2 in self.all_mailbox_types:
1716 self.assertNotIn(attr, target.__dict__,
1719 def test_maildir_to_maildir(self):
1726 self._check_sample(msg)
1727 self.assertEqual(msg.get_flags(), 'DFPRST')
1728 self.assertEqual(msg.get_subdir(), 'cur')
1729 self.assertEqual(msg.get_date(), date)
1731 def test_maildir_to_mboxmmdf(self):
1741 self.assertEqual(msg.get_flags(), result)
1742 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
1745 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
1747 def test_maildir_to_mh(self):
1755 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
1758 def test_maildir_to_babyl(self):
1767 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
1770 def test_mboxmmdf_to_maildir(self):
1780 self.assertEqual(msg.get_flags(), result)
1781 self.assertEqual(msg.get_date(), 0.0)
1783 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
1786 def test_mboxmmdf_to_mboxmmdf(self):
1794 self.assertEqual(msg2.get_flags(), 'RODFA')
1795 self.assertEqual(msg2.get_from(), 'foo@bar')
1797 def test_mboxmmdf_to_mh(self):
1807 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
1810 def test_mboxmmdf_to_babyl(self):
1820 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1822 def test_mh_to_maildir(self):
1828 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1829 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1834 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
1835 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1837 def test_mh_to_mboxmmdf(self):
1844 self.assertEqual(class_(msg).get_flags(), result)
1850 self.assertEqual(class_(msg).get_flags(), 'OFA')
1852 def test_mh_to_mh(self):
1858 self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1861 def test_mh_to_babyl(self):
1868 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1873 self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
1876 def test_babyl_to_maildir(self):
1884 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1885 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1890 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
1891 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1893 def test_babyl_to_mboxmmdf(self):
1902 self.assertEqual(class_(msg).get_flags(), result)
1908 self.assertEqual(class_(msg).get_flags(), 'ODA')
1910 def test_babyl_to_mh(self):
1918 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
1923 self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1926 def test_babyl_to_babyl(self):
1934 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
1937 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
1939 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
1944 def _test_read(self, proxy):
1947 self.assertEqual(proxy.read(), b'bar')
1949 self.assertEqual(proxy.read(), b'ar')
1951 self.assertEqual(proxy.read(2), b'ba')
1953 self.assertEqual(proxy.read(-1), b'ar')
1955 self.assertEqual(proxy.read(1000), b'r')
1957 def _test_readline(self, proxy):
1961 self.assertEqual(proxy.readline(), b'foo' + linesep)
1962 self.assertEqual(proxy.readline(), b'bar' + linesep)
1963 self.assertEqual(proxy.readline(), b'fred' + linesep)
1964 self.assertEqual(proxy.readline(), b'bob')
1966 self.assertEqual(proxy.readline(), b'o' + linesep)
1968 self.assertEqual(proxy.readline(), b'fred' + linesep)
1970 self.assertEqual(proxy.readline(2), b'fr')
1971 self.assertEqual(proxy.readline(-10), b'ed' + linesep)
1973 def _test_readlines(self, proxy):
1977 self.assertEqual(proxy.readlines(), [b'foo' + linesep,
1981 self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
1983 self.assertEqual(proxy.readlines(4 + len(linesep)),
1986 self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
1989 def _test_iteration(self, proxy):
1994 self.assertEqual(next(iterator), b'foo' + linesep)
1995 self.assertEqual(next(iterator), b'bar' + linesep)
1996 self.assertEqual(next(iterator), b'fred' + linesep)
1997 self.assertEqual(next(iterator), b'bob')
1998 self.assertRaises(StopIteration, next, iterator)
2000 def _test_seek_and_tell(self, proxy):
2004 self.assertEqual(proxy.tell(), 3)
2005 self.assertEqual(proxy.read(len(linesep)), linesep)
2007 self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
2009 self.assertEqual(proxy.read(3), b'bar')
2011 self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
2013 self.assertFalse(proxy.read())
2015 def _test_close(self, proxy):
2017 self.assertFalse(proxy.closed)
2019 self.assertTrue(proxy.closed)
2022 self.assertTrue(proxy.closed)
2027 def setUp(self):
2028 self._path = os_helper.TESTFN
2029 self._file = open(self._path, 'wb+')
2031 def tearDown(self):
2032 self._file.close()
2033 self._delete_recursively(self._path)
2035 def test_initialize(self):
2037 self._file.write(b'foo')
2038 pos = self._file.tell()
2039 proxy0 = mailbox._ProxyFile(self._file)
2040 self.assertEqual(proxy0.tell(), pos)
2041 self.assertEqual(self._file.tell(), pos)
2042 proxy1 = mailbox._ProxyFile(self._file, 0)
2043 self.assertEqual(proxy1.tell(), 0)
2044 self.assertEqual(self._file.tell(), pos)
2046 def test_read(self):
2047 self._file.write(b'bar')
2048 self._test_read(mailbox._ProxyFile(self._file))
2050 def test_readline(self):
2051 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2053 self._test_readline(mailbox._ProxyFile(self._file))
2055 def test_readlines(self):
2056 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2058 self._test_readlines(mailbox._ProxyFile(self._file))
2060 def test_iteration(self):
2061 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2063 self._test_iteration(mailbox._ProxyFile(self._file))
2065 def test_seek_and_tell(self):
2066 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2067 self._test_seek_and_tell(mailbox._ProxyFile(self._file))
2069 def test_close(self):
2070 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2071 self._test_close(mailbox._ProxyFile(self._file))
2076 def setUp(self):
2077 self._path = os_helper.TESTFN
2078 self._file = open(self._path, 'wb+')
2080 def tearDown(self):
2081 self._file.close()
2082 self._delete_recursively(self._path)
2084 def test_initialize(self):
2086 self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
2087 pos = self._file.tell()
2088 proxy = mailbox._PartialFile(self._file, 2, 5)
2089 self.assertEqual(proxy.tell(), 0)
2090 self.assertEqual(self._file.tell(), pos)
2092 def test_read(self):
2093 self._file.write(bytes('***bar***', 'ascii'))
2094 self._test_read(mailbox._PartialFile(self._file, 3, 6))
2096 def test_readline(self):
2097 self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
2099 self._test_readline(mailbox._PartialFile(self._file, 5,
2102 def test_readlines(self):
2103 self._file.write(bytes('foo%sbar%sfred%sbob?????' %
2105 self._test_readlines(mailbox._PartialFile(self._file, 0,
2108 def test_iteration(self):
2109 self._file.write(bytes('____foo%sbar%sfred%sbob####' %
2111 self._test_iteration(mailbox._PartialFile(self._file, 4,
2114 def test_seek_and_tell(self):
2115 self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
2116 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
2119 def test_close(self):
2120 self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
2121 self._test_close(mailbox._PartialFile(self._file, 1,
2138 def setUp(self):
2140 self._dir = os_helper.TESTFN
2141 if os.path.isdir(self._dir):
2142 os_helper.rmtree(self._dir)
2143 elif os.path.isfile(self._dir):
2144 os_helper.unlink(self._dir)
2145 os.mkdir(self._dir)
2146 os.mkdir(os.path.join(self._dir, "cur"))
2147 os.mkdir(os.path.join(self._dir, "tmp"))
2148 os.mkdir(os.path.join(self._dir, "new"))
2149 self._counter = 1
2150 self._msgfiles = []
2152 def tearDown(self):
2153 list(map(os.unlink, self._msgfiles))
2154 os_helper.rmdir(os.path.join(self._dir, "cur"))
2155 os_helper.rmdir(os.path.join(self._dir, "tmp"))
2156 os_helper.rmdir(os.path.join(self._dir, "new"))
2157 os_helper.rmdir(self._dir)
2159 def createMessage(self, dir, mbox=False):
2161 pid = self._counter
2162 self._counter += 1
2164 tmpname = os.path.join(self._dir, "tmp", filename)
2165 newname = os.path.join(self._dir, dir, filename)
2167 self._msgfiles.append(tmpname)
2176 self._msgfiles.append(newname)
2179 def test_empty_maildir(self):
2183 self.mbox = mailbox.Maildir(os_helper.TESTFN)
2184 #self.assertTrue(hasattr(self.mbox, "boxes"))
2185 #self.assertEqual(len(self.mbox.boxes), 0)
2186 self.assertIsNone(self.mbox.next())
2187 self.assertIsNone(self.mbox.next())
2189 def test_nonempty_maildir_cur(self):
2190 self.createMessage("cur")
2191 self.mbox = mailbox.Maildir(os_helper.TESTFN)
2192 #self.assertEqual(len(self.mbox.boxes), 1)
2193 self.assertIsNotNone(self.mbox.next())
2194 self.assertIsNone(self.mbox.next())
2195 self.assertIsNone(self.mbox.next())
2197 def test_nonempty_maildir_new(self):
2198 self.createMessage("new")
2199 self.mbox = mailbox.Maildir(os_helper.TESTFN)
2200 #self.assertEqual(len(self.mbox.boxes), 1)
2201 self.assertIsNotNone(self.mbox.next())
2202 self.assertIsNone(self.mbox.next())
2203 self.assertIsNone(self.mbox.next())
2205 def test_nonempty_maildir_both(self):
2206 self.createMessage("cur")
2207 self.createMessage("new")
2208 self.mbox = mailbox.Maildir(os_helper.TESTFN)
2209 #self.assertEqual(len(self.mbox.boxes), 2)
2210 self.assertIsNotNone(self.mbox.next())
2211 self.assertIsNotNone(self.mbox.next())
2212 self.assertIsNone(self.mbox.next())
2213 self.assertIsNone(self.mbox.next())
2303 def test__all__(self):
2304 support.check__all__(self, mailbox,