您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15226|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;- x8 p% o$ M% I& u; e9 x
  2. require_once __DIR__ . '/Workerman/Autoloader.php';- S" T8 m7 j7 _% N! a  ]: p
  3. 0 P( @' W* d5 F. K- ~2 L
  4. $worker = new Worker();9 D' Y& R0 N8 Y2 V, E6 v, D
  5. // 4个进程
    0 Y5 E9 D! W* ?/ F. v
  6. $worker->count = 4;4 P$ I5 s1 V" |) h
  7. // 每个进程启动后在当前进程新增一个Worker监听
    5 z" j0 g" U( N+ i
  8. $worker->onWorkerStart = function($worker), ]! J) C# y7 e
  9. {
    $ z7 K! [* a+ g& E
  10.     /**2 X/ ~) \/ P$ O, C5 t0 A" G5 T1 O
  11.      * 4个进程启动的时候都创建2016端口的Worker
    , l. o1 S- H- \* a& d4 @% _! v
  12.      * 当执行到worker->listen()时会报Address already in use错误# Z* o1 p" t& @  P1 |4 d1 ^
  13.      * 如果worker->count=1则不会报错0 }' h5 j' l+ W: F# P
  14.      */9 T0 m0 S0 ^, p; [. _7 U
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    8 @* x4 `# x6 ]7 H; A& c+ `
  16.     $inner_worker->onMessage = 'on_message';
    + m9 ^, N4 w2 N
  17.     // 执行监听。这里会报Address already in use错误/ j/ d- B: Y0 r
  18.     $inner_worker->listen();
    9 x7 L) D1 u" B3 ]3 }" M) t
  19. };
    $ }4 ~) G6 f7 W0 B

  20. 2 E$ B# i+ Y) ~
  21. $worker->onMessage = 'on_message';  r4 U. m* ~/ E6 t
  22. # o2 W: @, c) @$ v
  23. function on_message($connection, $data)/ [% M* N6 t: k; J
  24. {
    9 G+ Q' M* t' d  h5 c9 j* K
  25.     $connection->send("hello\n");; D% f5 ^7 `( M5 t4 B
  26. }
    , _% B5 b8 {4 _/ A5 k

  27. 1 {( X8 ?* b2 M2 H  e) a$ R
  28. // 运行worker, {9 i! ~$ B5 W: v# S) g1 w$ L
  29. Worker::runAll();
    & c# }3 q" W4 U
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    # J! ?% m3 V) X, ?
  31. 6 E* r0 N- Y+ i
  32. use Workerman\Worker;
    5 l  J: P# s+ E# U# r. h1 D
  33. require_once './Workerman/Autoloader.php';
    3 a+ r' U' O9 _# C: G$ r

  34. 4 Y" ^9 V( g5 ~; ^
  35. $worker = new Worker('text://0.0.0.0:2015');
    - S' f, q/ j% u' B! M- I
  36. // 4个进程. N. Z; U: g2 q2 e& m
  37. $worker->count = 4;
    # J* o1 D* {/ z3 C8 J  \$ Q
  38. // 每个进程启动后在当前进程新增一个Worker监听( I5 F; p0 a+ Q$ p1 s. P+ c
  39. $worker->onWorkerStart = function($worker)
    # J% R- o0 x4 o: [: j/ K* I7 v( l5 A% H
  40. {
    . \7 }, F/ p) \9 ]
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    $ k! U8 @1 u6 ~5 S. j! `# @, e: |
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ' B5 R: [; d9 c5 M5 M5 H9 @+ w* M
  43.     $inner_worker->reusePort = true;1 _8 n  ?+ l) |! c$ O1 Z7 j
  44.     $inner_worker->onMessage = 'on_message';) d2 K7 d& A& P+ U: _  v, \5 @
  45.     // 执行监听。正常监听不会报错
    - [% q8 k0 d) [! t. ^
  46.     $inner_worker->listen();
    + L0 ]9 r6 F( P" {
  47. };
    8 Z& R( E5 u: k" {

  48. - v6 P4 c) [* w7 D
  49. $worker->onMessage = 'on_message';8 Q& z* K: e+ M  M5 d
  50. 0 l3 Q. z, m: v8 P; r) Q
  51. function on_message($connection, $data); `3 g4 d. Q& m6 C
  52. {% V" K+ n/ o7 a
  53.     $connection->send("hello\n");( c: ^% V6 o  r8 j( q6 E
  54. }
    . m7 h7 K+ f2 [( L2 z7 K

  55. ' |# |$ t  W& Y
  56. // 运行worker
    8 }, z+ ~! c6 ?) V' }3 p* U4 V' @
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php# `6 ]8 j' K- _0 @! c# R4 ^
  2. use Workerman\Worker;
    + B" j0 t1 U- y2 l& H7 Z
  3. require_once './Workerman/Autoloader.php';/ @% S& P" [6 ]3 L/ o
  4. // 初始化一个worker容器,监听1234端口
    ' O2 b$ S' `$ [& d$ H8 I  U, }; w
  5. $worker = new Worker('websocket://0.0.0.0:1234');2 B+ C: X7 @/ g) x3 j5 G

  6. " d' b3 n( q0 w
  7. /*
    2 z3 D! w/ K& \& ~
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误' c* m! R* w, R( A" {5 I7 `
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    % H2 U: h1 D, H
  10. */
    ' ^, l( `: o' a7 e% y2 i
  11. $worker->count = 1;
    , R: f/ h3 H9 a. ^& H
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口- ?2 K; C- Z/ O8 T% E
  13. $worker->onWorkerStart = function($worker)7 o  H) I, ?; h4 c. w
  14. {+ ^3 G. g9 j, ?7 G* s
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符5 p/ _- V5 g8 G- g+ v7 @
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');3 H( R3 `7 g' w0 h7 e4 m5 B8 l  F
  17.     $inner_text_worker->onMessage = function($connection, $buffer)9 A. E' k4 [( v; s6 |
  18.     {
    $ w6 ]: x) m" f' N+ D* {2 s
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    8 t0 u' {" K3 i( Z$ V9 k0 i
  20.         $data = json_decode($buffer, true);
    3 Y" h( E+ Z% X9 C8 Z$ j5 T# V
  21.         $uid = $data['uid'];
      ^2 z8 n2 }7 ~6 N4 [9 C
  22.         // 通过workerman,向uid的页面推送数据' H; s2 K/ ?- X
  23.         $ret = sendMessageByUid($uid, $buffer);3 A! x+ I; {  V3 f7 F  }
  24.         // 返回推送结果6 F) m3 K3 q: I/ C7 q
  25.         $connection->send($ret ? 'ok' : 'fail');; g) I, C5 o: ^" h( y5 ~
  26.     };6 E3 {7 B) e/ A2 F% C
  27.     // ## 执行监听 ##
    # V( k5 D5 Z' W
  28.     $inner_text_worker->listen();
    # b9 D6 }/ T1 e
  29. };
    ' z1 Y) R1 d# A7 s# e
  30. // 新增加一个属性,用来保存uid到connection的映射
    $ q  Q& }2 @. m% @  B) g
  31. $worker->uidConnections = array();8 B! `5 {0 V. S& t5 M! K& b; i
  32. // 当有客户端发来消息时执行的回调函数
    9 J0 `( C& {! W
  33. $worker->onMessage = function($connection, $data)
      V7 c" {1 }3 A/ B7 _
  34. {; K4 Y6 W- S1 [
  35.     global $worker;8 l9 T: A* o+ v- M/ |
  36.     // 判断当前客户端是否已经验证,既是否设置了uid: \+ [9 y) o7 l6 b# j4 {
  37.     if(!isset($connection->uid))
    - U- U6 ]& f3 M
  38.     {
    + J$ p7 {: @4 ?  N; K9 m$ A9 c
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)! p+ ~# [& F* c+ j# @% M* q+ G
  40.        $connection->uid = $data;
    ; q3 q' g1 l$ M  p  x9 g9 N
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    - C1 u  @' z4 Q1 D; j6 t
  42.         * 实现针对特定uid推送数据" C" t% h1 t5 e  c" u
  43.         */, r9 d+ T  @9 ~  A6 _
  44.        $worker->uidConnections[$connection->uid] = $connection;
    - d7 g. p7 e+ u. Y2 P4 [
  45.        return;
    - i7 o: D- W" t. P7 w
  46.     }) S0 M- ?/ {  K' w& o$ ]; B
  47. };! _7 ^$ w2 A6 Q4 a9 J

  48.   M7 J: l2 _3 j; a5 }( q7 C0 w
  49. // 当有客户端连接断开时5 }4 z2 m  W8 X8 P# p
  50. $worker->onClose = function($connection)0 i+ t1 H) r/ _' v' K8 H! O5 `
  51. {- r9 E4 A, B$ F, \/ A+ c4 H! S- g
  52.     global $worker;& A( H* Q; P. [: z
  53.     if(isset($connection->uid))% E0 b4 T  Z8 n/ \, H
  54.     {
    7 t3 Y5 r7 S, ~5 n
  55.         // 连接断开时删除映射/ _' o4 K! S$ b- J" Q  o" D! |
  56.         unset($worker->uidConnections[$connection->uid]);: c% c# B8 c1 y& j. H/ m3 V
  57.     }
    $ r5 i  c% {) `7 `+ Z7 s2 i( P  \1 j
  58. };6 O- M) @9 S# P3 a

  59.   l0 r( u/ O% g2 n2 i
  60. // 向所有验证的用户推送数据
    3 {) Q! |" i" n1 T. ~& @) Z+ ~
  61. function broadcast($message)
    3 X, U6 E4 `  x) m; F5 f+ a, J6 }/ ]; Y
  62. {
    5 U* b; k- a: V
  63.    global $worker;% t+ Q1 l( B" t" {' c) x
  64.    foreach($worker->uidConnections as $connection)
    - j9 K3 @0 L4 k
  65.    {% f* d6 {1 B0 X( [
  66.         $connection->send($message);9 Q$ a- ], G9 Y8 v
  67.    }4 q% V) m; J; E; Z- r& I8 t' K# V
  68. }8 |) R+ O; e/ ?2 ~

  69. : v/ D3 k% m7 ^  t' A5 f* R9 ^' Q
  70. // 针对uid推送数据$ ^# O) k5 H+ ^5 i. A9 P* O1 v. Q* u
  71. function sendMessageByUid($uid, $message)
    + M/ @5 s3 \$ [) O- R: f9 }
  72. {
    2 n% P% M  W7 f$ D( |
  73.     global $worker;( S7 i$ ~! b7 h$ F: J
  74.     if(isset($worker->uidConnections[$uid]))
    1 L% Y0 s7 G* Y% D% S
  75.     {
    * Z5 o0 ]9 H2 s+ {
  76.         $connection = $worker->uidConnections[$uid];
    1 u1 a! \1 K$ L/ ]* y
  77.         $connection->send($message);
    9 Y: V6 [* S. k- @
  78.         return true;' ~1 o  p$ L+ D& q
  79.     }) K+ w: E& j4 ~
  80.     return false;
    ( H- P4 x' Q" t* F: d) v
  81. }
    8 f+ t' ~4 u: G8 L7 O

  82. * o2 d" m% a  `1 K9 }; @7 L5 D
  83. // 运行所有的worker
    5 C; `0 Z" N" c% Y: y
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    8 y! N6 e) O" n) _/ D
  2. ws.onopen = function(){
    ) r! d: q+ a# Q( z0 Y6 ^+ i1 {% d
  3.     var uid = 'uid1';' b( z) e- g; p  V) D# \) l* h/ y
  4.     ws.send(uid);
    . d# ^0 a' h- m$ J0 I; g
  5. };
    . x/ A$ E6 N5 w8 |9 c
  6. ws.onmessage = function(e){
    2 w' F( W/ H, L- b; Q  b$ `
  7.     alert(e.data);
    + I1 D/ X/ e0 T2 ?
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    , p, l$ y# d5 W* H
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);2 M% Q5 [( \6 i8 i& B" t
  3. // 推送的数据,包含uid字段,表示是给这个uid推送9 x# P8 P( H7 c; f. m! l+ |
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    $ g$ A/ m' u( w  ~$ W/ p
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符; P7 S4 E# c' R: c6 S; N2 |
  6. fwrite($client, json_encode($data)."\n");
    6 F: h. }7 k; i/ Q+ v! ^) L( G& `
  7. // 读取推送结果- \7 W2 O9 ~5 Z* H. Q- M
  8. echo fread($client, 8192);
复制代码

( t+ l( i% V4 \, z5 Z% }7 ~, |
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-20 01:17 , Processed in 0.065764 second(s), 21 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!