// forkjoin.cpp - written and placed in the public domain by Wei Dai #include "pch.h" #include "forkjoin.h" #include "queue.h" #include #if __cplusplus >= 201103L # define TW_UNIQUE_PTR std::unique_ptr #else # define TW_UNIQUE_PTR std::auto_ptr #endif Fork::Fork(int n, BufferedTransformation *const *givenOutPorts) : numberOfPorts(n), outPorts(n) { currentPort = 0; for (unsigned int i=0; i out(newOut ? newOut : new ByteQueue); outPorts[currentPort]->Close(); outPorts[currentPort]->TransferTo(*out); outPorts[currentPort].reset(out.release()); } void Fork::Attach(BufferedTransformation *newOut) { if (outPorts[currentPort]->Attachable()) outPorts[currentPort]->Attach(newOut); else Detach(newOut); } void Fork::Close() { InputFinished(); for (unsigned int i=0; iClose(); } void Fork::Put(ibyte inByte) { for (unsigned int i=0; iPut(inByte); } void Fork::Put(const ibyte *inString, unsigned int length) { for (unsigned int i=0; iPut(inString, length); } // ******************************************************** Join::Join(unsigned int n, BufferedTransformation *outQ) : Filter(outQ), numberOfPorts(n), inPorts(n), interfacesOpen(n), interfaces(n) { for (unsigned int i=0; iClose(); } // ******************************************************** void Interface::Put(ibyte inByte) { bq.Put(inByte); parent.NotifyInput(id, 1); } void Interface::Put(const ibyte *inString, unsigned int length) { bq.Put(inString, length); parent.NotifyInput(id, length); } unsigned long Interface::MaxRetrieveable() { return parent.MaxRetrieveable(); } void Interface::Close() { parent.NotifyClose(id); } void Interface::Detach(BufferedTransformation *bt) { parent.Detach(bt); } void Interface::Attach(BufferedTransformation *bt) { parent.Attach(bt); } unsigned int Interface::Get(ibyte &outByte) { return parent.Get(outByte); } unsigned int Interface::Get(ibyte *outString, unsigned int getMax) { return parent.Get(outString, getMax); } unsigned int Interface::Peek(ibyte &outByte) const { return parent.Peek(outByte); }