/** *//**
* http://www.5a520.cn  http://www.bt285.cn
*/ 
package instream;   
  
import java.net.URL;   
import java.net.URLDecoder;   
import decode.Header;   
import tag.MP3Tag;   
import tag.TagThread;   
  
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {   
    public static final int UNIT_LENGTH_BITS = 15;                  //32K   
    public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH = UNIT_LENGTH << 4;            //16块   
    public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;   
    public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);   
    private static final int MAX_WRITER = 8;   
    private static long file_pointer;   
    private static int read_pos;   
    private static int fill_bytes;   
    private static byte[] buf;      //同时也作读写同步锁:buf.wait()/buf.notify()   
    private static int[] buf_bytes;   
    private static int buf_index;   
    private static int alloc_pos;   
    private static URL url = null;   
    private static boolean isalive = true;   
    private static int writer_count;   
    private static int await_count;   
    private long file_length;   
    private long frame_bytes;   
       
    public BuffRandAcceURL(String sURL) throws Exception {   
        this(sURL,MAX_WRITER);   
    }   
       
    public BuffRandAcceURL(String sURL, int download_threads) throws Exception {   
        buf = new byte[BUF_LENGTH];   
        buf_bytes = new int[UNIT_COUNT];   
        url = new URL(sURL);   
           
        //创建线程以异步方式解析ID3   
        new TagThread(url);   
           
        //打印当前文件名   
        try {   
            String s = URLDecoder.decode(sURL, "GBK");   
            System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1));   
            s = null;   
        } catch (Exception e) {   
            System.out.println("start>> " + sURL);   
        }   
           
        //创建"写"线程   
        for(int i = 0; i < download_threads; i++)   
            new Writer(this, url, buf, i+1);   
        frame_bytes = file_length = HttpReader.getContentLength();   
        if(file_length == 0) {   
            Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。";   
            throw new Exception("retry " + HttpReader.MAX_RETRY);   
        }   
        writer_count = download_threads;   
           
        //缓冲   
        try_cache();   
           
        //跳过ID3 v2   
        MP3Tag mP3Tag = new MP3Tag();   
        int v2_size = mP3Tag.checkID3V2(buf,0);   
        if (v2_size > 0) {   
            frame_bytes -= v2_size;   
            //seek(v2_size):   
            fill_bytes -= v2_size;   
            file_pointer = v2_size;   
            read_pos = v2_size;   
            read_pos &= BUF_LENGTH_MASK;   
            int units = v2_size >> UNIT_LENGTH_BITS;   
            for(int i = 0; i < units; i++) {   
                buf_bytes[i] = 0;   
                this.notifyWriter();   
            }   
            buf_bytes[units] -= v2_size;   
            this.notifyWriter();   
        }   
        mP3Tag = null;   
    }   
       
    private void try_cache() throws InterruptedException {   
        int cache_size = BUF_LENGTH;   
        if(cache_size > (int)file_length - alloc_pos)   
            cache_size = (int)file_length - alloc_pos;   
        cache_size -= UNIT_LENGTH;   
           
        //等待填写当前正在读的那"一块"缓冲区   
        /**//*if(fill_bytes >= cache_size && writer_count > 0) {  
            synchronized (buf) {  
                buf.wait();  
            }  
            return;  
        }*/  
           
        //等待填满缓冲区   
        while (fill_bytes < cache_size) {   
            if (writer_count == 0 || isalive == false)   
                return;   
            if(BUF_LENGTH > (int)file_length - alloc_pos)   
                cache_size = (int)file_length - alloc_pos - UNIT_LENGTH;   
            System.out.printf("\r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);   
            synchronized (buf) {   
                buf.wait();   
            }   
        }   
        System.out.printf("\r");   
    }   
       
    private int try_reading(int i, int len) throws Exception {   
        int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);   
        int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   
        while (r < len) {   
            if (writer_count == 0 || isalive == false)   
                return r;   
            try_cache();   
            r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   
        }   
           
        return len;   
    }   
       
    /**//*  
     * 各个"写"线程互斥等待空闲块  
     */  
    public synchronized boolean tryWriting(Writer w) throws InterruptedException {   
        await_count++;   
        while (buf_bytes[buf_index] != 0 && isalive) {   
            this.wait();   
        }   
           
        //下载速度足够就结束一个"写"线程   
        if(writer_count > 1 && w.await_count >= await_count &&   
                w.await_count >= writer_count)   
            return false;   
           
        if(alloc_pos >= file_length)   
            return false;   
        w.await_count = await_count;   
        await_count--;   
        w.start_pos = alloc_pos;   
        w.index = buf_index;   
        alloc_pos += UNIT_LENGTH;   
        buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;   
        return isalive;   
    }   
       
    public void updateBuffer(int i, int len) {   
        synchronized (buf) {   
            buf_bytes[i] = len;   
            fill_bytes += len;   
            buf.notify();   
        }   
    }   
       
    public void updateWriterCount() {   
        synchronized (buf) {   
            writer_count--;   
            buf.notify();   
        }   
    }   
       
    public synchronized void notifyWriter() {   
        this.notifyAll();   
    }   
       
    public void terminateWriters() {   
        synchronized (buf) {   
            if (isalive) {   
                isalive = false;   
                Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY   
                        + " 次后放弃,请您稍后再试。";   
            }   
            buf.notify();   
        }   
           
        notifyWriter();        
    }   
       
    public int read() throws Exception {   
        int iret = -1;   
        int i = read_pos >> UNIT_LENGTH_BITS;   
        // 1."等待"有1字节可读   
        while (buf_bytes[i] < 1) {   
            try_cache();   
            if (writer_count == 0)   
                return -1;   
        }   
        if(isalive == false)   
            return -1;   
  
        // 2.读取   
        iret = buf[read_pos] & 0xff;   
        fill_bytes--;   
        file_pointer++;   
        read_pos++;   
        read_pos &= BUF_LENGTH_MASK;   
        if (--buf_bytes[i] == 0)   
            notifyWriter();     // 3.通知   
  
        return iret;   
    }   
       
    public int read(byte b[]) throws Exception {   
        return read(b, 0, b.length);   
    }   
  
    public int read(byte[] b, int off, int len) throws Exception {   
        if(len > UNIT_LENGTH)   
            len = UNIT_LENGTH;   
        int i = read_pos >> UNIT_LENGTH_BITS;   
           
        // 1."等待"有足够内容可读   
        if(try_reading(i, len) < len || isalive == false)   
            return -1;   
  
        // 2.读取   
        int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH   
        if (tail_len < len) {   
            System.arraycopy(buf, read_pos, b, off, tail_len);   
            System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);   
        } else  
            System.arraycopy(buf, read_pos, b, off, len);   
  
        fill_bytes -= len;   
        file_pointer += len;   
        read_pos += len;   
        read_pos &= BUF_LENGTH_MASK;   
        buf_bytes[i] -= len;   
        if (buf_bytes[i] < 0) {   
            int ni = read_pos >> UNIT_LENGTH_BITS;   
            buf_bytes[ni] += buf_bytes[i];   
            buf_bytes[i] = 0;   
            notifyWriter();   
        } else if (buf_bytes[i] == 0)   
            notifyWriter();   
           
        return len;   
    }   
       
    /**//*  
     * 从src_off位置复制,不移动文件"指针"  
     */  
    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {   
        int rpos = read_pos + src_off;   
        if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false)   
            return -1;   
        int tail_len = BUF_LENGTH - rpos;   
        if (tail_len < len) {   
            System.arraycopy(buf, rpos, b, dst_off, tail_len);   
            System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);   
        } else  
            System.arraycopy(buf, rpos, b, dst_off, len);   
        // 不发信号   
  
        return len;   
    }   
       
    public long length() {   
        return file_length;   
    }   
       
    public long getFilePointer() {   
        return file_pointer;   
    }   
  
    public void close() {   
        //   
    }   
       
    //   
    public void seek(long pos) throws Exception {   
        //   
    }   
       
}
     |