Header import, and dumporiginal extensions.
Add ability to dump headers or mimetypes for original messages.
Add ability to dump all messages, instead of just hashes listed.
Add new command cmd/headers to import all headers from messages in table
original. Utility can work incrementally or do fully table imports.
Moved 'fetch all original messages' code to db/util.go. Added conditional
version given table name that will only return original messages that do not
have matching hashes in the specified table.
This commit is contained in:
parent
61c61a1075
commit
2f6b083f47
4
TODO
Normal file
4
TODO
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
- Add 'fast_header' table that is just the values we'd want to search on (i.e.
|
||||||
|
the headers we hash on, plus whatever is needed for jwz threading).
|
||||||
|
- Investigate stored procedures that update 'fast_header' automatically when
|
||||||
|
we we add bulk headers to table 'header'
|
||||||
@ -1,14 +1,107 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"io"
|
||||||
|
"mime"
|
||||||
|
"net/mail"
|
||||||
|
"sort"
|
||||||
"xinu.tv/email/db"
|
"xinu.tv/email/db"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
dumpMime = flag.Bool("mime", false, "show multipart mime types for message")
|
||||||
|
headers = flag.Bool("headers", false, "show only headers")
|
||||||
|
)
|
||||||
|
|
||||||
|
func printMultipartMime(hash string, r io.Reader) error {
|
||||||
|
msg, err := mail.ReadMessage(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mediatype, params, err := mime.ParseMediaType(msg.Header.Get("Content-Type"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.Infof("%s mt: %s p: %v", hash, mediatype, params)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func printHeaders(hash string, r io.Reader) error {
|
||||||
|
msg, err := mail.ReadMessage(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
hdrs := make([]string, 0, len(msg.Header))
|
||||||
|
for k := range msg.Header {
|
||||||
|
hdrs = append(hdrs, k)
|
||||||
|
}
|
||||||
|
fmt.Println(hash)
|
||||||
|
sort.Strings(hdrs)
|
||||||
|
for _, k := range hdrs {
|
||||||
|
vs := msg.Header[k]
|
||||||
|
fmt.Printf("%s:", k)
|
||||||
|
for _, v := range vs {
|
||||||
|
fmt.Printf(" %s", v)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func printOneBlob(hash string, blob []byte) error {
|
||||||
|
r := bytes.NewReader(blob)
|
||||||
|
switch {
|
||||||
|
case *dumpMime:
|
||||||
|
if err := printMultipartMime(hash, r); err != nil {
|
||||||
|
// err != nil expected when mime type not found, so we log and
|
||||||
|
// squelch the error.
|
||||||
|
glog.Warningf("%s: %v", hash, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case *headers:
|
||||||
|
if err := printHeaders(hash, r); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
fmt.Println(string(blob))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type hashError struct {
|
||||||
|
hash string
|
||||||
|
error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (he hashError) Error() string {
|
||||||
|
return fmt.Sprintf("%s: %v", he.hash, he.error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func printAllBlobs(c *db.Conn) error {
|
||||||
|
oCh := make(chan db.Original)
|
||||||
|
errc := make(chan error)
|
||||||
|
donec := make(chan struct{})
|
||||||
|
defer close(donec)
|
||||||
|
go c.Originals(oCh, errc, donec)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case o := <-oCh:
|
||||||
|
if err := printOneBlob(o.Hash, o.Blob); err != nil {
|
||||||
|
return hashError{hash: o.Hash, error: err}
|
||||||
|
}
|
||||||
|
case err := <-errc:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
defer glog.Flush()
|
defer glog.Flush()
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
@ -19,8 +112,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if flag.NArg() == 0 {
|
if flag.NArg() == 0 {
|
||||||
fmt.Println("Must specify message hashes to print")
|
if err := printAllBlobs(c); err != nil {
|
||||||
os.Exit(1)
|
glog.Fatal(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var blob []byte
|
var blob []byte
|
||||||
@ -28,6 +123,8 @@ func main() {
|
|||||||
if err := c.OriginalBlobByHash(hash, &blob); err != nil {
|
if err := c.OriginalBlobByHash(hash, &blob); err != nil {
|
||||||
glog.Fatal(err)
|
glog.Fatal(err)
|
||||||
}
|
}
|
||||||
fmt.Println(string(blob))
|
if err := printOneBlob(hash, blob); err != nil {
|
||||||
|
glog.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
63
cmd/headers/headers.go
Normal file
63
cmd/headers/headers.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"flag"
|
||||||
|
"io"
|
||||||
|
"net/mail"
|
||||||
|
"xinu.tv/email/db"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
all = flag.Bool("all", false,
|
||||||
|
"extract headers from all messages. Default is to only extract from unprocessed messages.")
|
||||||
|
)
|
||||||
|
|
||||||
|
func insertHeaders(c *db.Conn, hash string, r io.Reader) error {
|
||||||
|
msg, err := mail.ReadMessage(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.Infoln("Insert", hash)
|
||||||
|
return c.InsertHeaders(hash, msg.Header)
|
||||||
|
}
|
||||||
|
|
||||||
|
func insertAllHeaders(c *db.Conn, all bool) error {
|
||||||
|
oCh := make(chan db.Original)
|
||||||
|
errc := make(chan error)
|
||||||
|
donec := make(chan struct{})
|
||||||
|
defer close(donec)
|
||||||
|
if all {
|
||||||
|
go c.Originals(oCh, errc, donec)
|
||||||
|
} else {
|
||||||
|
go c.OriginalsNotInTable("header", oCh, errc, donec)
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case o := <-oCh:
|
||||||
|
r := bytes.NewReader(o.Blob)
|
||||||
|
if err := insertHeaders(c, o.Hash, r); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case err := <-errc:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
defer glog.Flush()
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
c, err := db.NewConn("")
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := insertAllHeaders(c, *all); err != nil {
|
||||||
|
glog.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
109
db/util.go
109
db/util.go
@ -4,7 +4,10 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/mail"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -29,6 +32,73 @@ func NewConn(dsn string) (*Conn, error) {
|
|||||||
return &Conn{DB: db}, err
|
return &Conn{DB: db}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Original struct {
|
||||||
|
UID int64
|
||||||
|
Hash string
|
||||||
|
HeaderSize int64
|
||||||
|
TotalSize int64
|
||||||
|
Blob []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) Originals(oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
||||||
|
query := `
|
||||||
|
SELECT
|
||||||
|
uid,
|
||||||
|
hash,
|
||||||
|
header_size,
|
||||||
|
total_size,
|
||||||
|
blob
|
||||||
|
FROM
|
||||||
|
original
|
||||||
|
`
|
||||||
|
c.originals(query, oCh, errc, donec)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) OriginalsNotInTable(table string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
SELECT
|
||||||
|
original.uid,
|
||||||
|
original.hash,
|
||||||
|
original.header_size,
|
||||||
|
original.total_size,
|
||||||
|
original.blob
|
||||||
|
FROM
|
||||||
|
original
|
||||||
|
LEFT
|
||||||
|
JOIN %s ON original.hash = %s.hash
|
||||||
|
WHERE
|
||||||
|
%s.hash IS NULL;
|
||||||
|
`, table, table, table)
|
||||||
|
c.originals(query, oCh, errc, donec)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) originals(query string, oCh chan<- Original, errc chan<- error, donec <-chan struct{}) {
|
||||||
|
|
||||||
|
defer close(oCh)
|
||||||
|
defer close(errc)
|
||||||
|
rows, err := c.Query(query)
|
||||||
|
if err != nil {
|
||||||
|
errc <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
var o Original
|
||||||
|
if err := rows.Scan(&o.UID, &o.Hash, &o.HeaderSize, &o.TotalSize, &o.Blob); err != nil {
|
||||||
|
errc <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
oCh <- o
|
||||||
|
select {
|
||||||
|
case <-donec:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
errc <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Conn) OriginalBlobByHash(hash string, blob *[]byte) error {
|
func (c *Conn) OriginalBlobByHash(hash string, blob *[]byte) error {
|
||||||
row := c.QueryRow(`
|
row := c.QueryRow(`
|
||||||
SELECT
|
SELECT
|
||||||
@ -41,3 +111,42 @@ WHERE
|
|||||||
|
|
||||||
return row.Scan(blob)
|
return row.Scan(blob)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Conn) InsertHeaders(hash string, hdrs mail.Header) (err error) {
|
||||||
|
var tx *sql.Tx
|
||||||
|
tx, err = c.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
if err := tx.Rollback(); err != nil {
|
||||||
|
glog.Error(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = tx.Commit()
|
||||||
|
}()
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`
|
||||||
|
INSERT INTO
|
||||||
|
header (hash, name, value)
|
||||||
|
VALUES
|
||||||
|
($1, $2, $3)
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for k, vs := range hdrs {
|
||||||
|
for _, v := range vs {
|
||||||
|
if !utf8.ValidString(v) {
|
||||||
|
glog.Infof("%s: value for %q invalid UTF-8 %q", hash, k, v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := stmt.Exec(hash, k, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
7
pg/init-header.sql
Normal file
7
pg/init-header.sql
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
DROP TABLE header;
|
||||||
|
|
||||||
|
CREATE TABLE header (
|
||||||
|
hash CHAR(40) REFERENCES original (hash),
|
||||||
|
name TEXT,
|
||||||
|
value TEXT
|
||||||
|
);
|
||||||
Loading…
x
Reference in New Issue
Block a user